This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6b48f8a11d Harry model that supports value overrides: an ability to 
provide specific values for clustering, regular, and static columns
6b48f8a11d is described below

commit 6b48f8a11dbad8c0653309eb8193fa6157bba5d8
Author: Alex Petrov <oleksandr.pet...@gmail.com>
AuthorDate: Wed Jan 17 19:12:43 2024 +0100

    Harry model that supports value overrides: an ability to provide specific 
values for clustering, regular, and static columns
    
    Patch by Alex Petrov; reviewed by Caleb Rackliffe for CASSANDRA-19284
---
 .../cassandra/distributed/shared/ClusterUtils.java |   2 +
 .../fuzz/harry/examples/RepairBurnTest.java        | 138 ++++++++
 .../dsl/HistoryBuilderIntegrationTest.java         | 153 ++++++---
 .../HistoryBuilderOverridesIntegrationTest.java    | 359 +++++++++++++++++++++
 .../integration/model/IntegrationTestBase.java     |  12 +-
 .../model/ReconcilerIntegrationTest.java           |  23 +-
 .../fuzz/ring/ConsistentBootstrapTest.java         |   8 +-
 .../cassandra/fuzz/sai/SingleNodeSAITest.java      |  11 +-
 .../cassandra/fuzz/sai/StaticsTortureTest.java     |  20 +-
 .../cassandra/harry/checker/ModelChecker.java      |   1 +
 .../org/apache/cassandra/harry/ddl/ColumnSpec.java |  44 ++-
 .../org/apache/cassandra/harry/ddl/SchemaSpec.java |  36 ++-
 .../apache/cassandra/harry/dsl/ArrayWrapper.java   |  49 +++
 .../cassandra/harry/dsl/BatchVisitBuilder.java     |  11 +-
 .../apache/cassandra/harry/dsl/HistoryBuilder.java | 141 +++++---
 .../cassandra/harry/dsl/OverridingBijection.java   |  84 +++++
 .../cassandra/harry/dsl/OverridingCkGenerator.java | 153 +++++++++
 .../cassandra/harry/dsl/PartitionVisitState.java   |  63 ++--
 .../harry/dsl/PartitionVisitStateImpl.java         | 115 +++++++
 .../harry/dsl/ReplayingHistoryBuilder.java         |  13 +-
 .../harry/dsl/SingleOperationBuilder.java          |   5 +-
 .../harry/dsl/SingleOperationVisitBuilder.java     |  72 +++--
 .../harry/dsl/ValueDescriptorIndexGenerator.java   |  13 +-
 .../apache/cassandra/harry/dsl/ValueHelper.java    |  74 +++++
 .../apache/cassandra/harry/dsl/ValueOverrides.java |  24 ++
 .../apache/cassandra/harry/gen/DataGenerators.java |  24 +-
 .../apache/cassandra/harry/model/NoOpChecker.java  |  16 +-
 .../cassandra/harry/operations/Relation.java       |   2 +-
 28 files changed, 1445 insertions(+), 221 deletions(-)

diff --git 
a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java 
b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
index 3d3b9f3958..3e60a02523 100644
--- a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
+++ b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
@@ -545,6 +545,8 @@ public class ClusterUtils
 
     public static void unpauseCommits(IInvokableInstance instance)
     {
+        if (instance.isShutdown())
+            return;
         instance.runOnInstance(() -> {
             TestProcessor processor = (TestProcessor) 
((ClusterMetadataService.SwitchableProcessor) 
ClusterMetadataService.instance().processor()).delegate();
             processor.unpause();
diff --git 
a/test/distributed/org/apache/cassandra/fuzz/harry/examples/RepairBurnTest.java 
b/test/distributed/org/apache/cassandra/fuzz/harry/examples/RepairBurnTest.java
new file mode 100644
index 0000000000..4092d6a02f
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/fuzz/harry/examples/RepairBurnTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.fuzz.harry.examples;
+
+import java.util.Arrays;
+import java.util.Random;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.fuzz.harry.integration.model.IntegrationTestBase;
+import org.apache.cassandra.harry.checker.ModelChecker;
+import org.apache.cassandra.harry.ddl.ColumnSpec;
+import org.apache.cassandra.harry.ddl.SchemaSpec;
+import org.apache.cassandra.harry.dsl.HistoryBuilder;
+import org.apache.cassandra.harry.gen.rng.JdkRandomEntropySource;
+import org.apache.cassandra.harry.model.Model;
+import org.apache.cassandra.harry.operations.Query;
+import org.apache.cassandra.harry.sut.SystemUnderTest;
+import org.apache.cassandra.harry.sut.TokenPlacementModel;
+import org.apache.cassandra.harry.sut.injvm.InJvmSutBase;
+import org.apache.cassandra.harry.tracker.DataTracker;
+import org.apache.cassandra.harry.tracker.DefaultDataTracker;
+import org.apache.cassandra.harry.visitors.ReplayingVisitor;
+
+public class RepairBurnTest extends IntegrationTestBase
+{
+    @BeforeClass
+    public static void before() throws Throwable
+    {
+        init(3,
+             (cfg) -> 
InJvmSutBase.defaultConfig().accept(cfg.with(Feature.NETWORK, Feature.GOSSIP)));
+    }
+
+    private final long seed = 1L;
+
+    @Test
+    public void repairBurnTest() throws Throwable
+    {
+        SchemaSpec schema = new SchemaSpec("repair_burn_test",
+                                           "test_overrides",
+                                           Arrays.asList(
+                                           ColumnSpec.pk("pk1", 
ColumnSpec.asciiType(4, 10)),
+                                           ColumnSpec.pk("pk2", 
ColumnSpec.int64Type),
+                                           ColumnSpec.pk("pk3", 
ColumnSpec.int64Type),
+                                           ColumnSpec.pk("pk4", 
ColumnSpec.asciiType(2, 10))),
+                                           Arrays.asList(
+                                           ColumnSpec.ck("ck1", 
ColumnSpec.asciiType(2, 0)),
+                                           ColumnSpec.ck("ck2", 
ColumnSpec.asciiType(2, 0)),
+                                           ColumnSpec.ck("ck3", 
ColumnSpec.int64Type),
+                                           ColumnSpec.ck("ck4", 
ColumnSpec.asciiType(4, 100)),
+                                           ColumnSpec.ck("ck5", 
ColumnSpec.asciiType(8, 100))
+                                           ),
+                                           Arrays.asList(
+                                           
ColumnSpec.regularColumn("regular1", ColumnSpec.asciiType(8, 100)),
+                                           
ColumnSpec.regularColumn("regular2", ColumnSpec.asciiType(8, 100)),
+                                           
ColumnSpec.regularColumn("regular3", ColumnSpec.asciiType(8, 100))
+                                           ),
+                                           Arrays.asList(
+                                           ColumnSpec.staticColumn("static1", 
ColumnSpec.asciiType(8, 100)),
+                                           ColumnSpec.staticColumn("static2", 
ColumnSpec.asciiType(8, 100)),
+                                           ColumnSpec.staticColumn("static3", 
ColumnSpec.asciiType(8, 100))
+                                           ));
+
+        sut.schemaChange("CREATE KEYSPACE " + schema.keyspace + " WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+        sut.schemaChange(schema.compile().cql());
+
+        ModelChecker<HistoryBuilder> modelChecker = new ModelChecker<>();
+        JdkRandomEntropySource rng = new JdkRandomEntropySource(new 
Random(seed));
+        DataTracker tracker = new DefaultDataTracker();
+
+        TokenPlacementModel.ReplicationFactor rf = new 
TokenPlacementModel.SimpleReplicationFactor(3);
+
+        int maxPartitionSize = 10;
+        int partitions = 1000;
+
+        modelChecker.init(new HistoryBuilder(seed, maxPartitionSize, 10, 
schema, rf))
+                    .step((history) -> {
+                        history.visitPartition(rng.nextInt(partitions),
+                                               (ps) -> {
+                                                   Object[][] clusterings = 
new Object[maxPartitionSize][];
+                                                   for (int i = 0; i < 
clusterings.length; i++)
+                                                   {
+                                                       Object[] v = 
schema.ckGenerator.inflate(rng.next());
+                                                       for (int j = 0; j < 
v.length; j++)
+                                                       {
+                                                           if 
(rng.nextBoolean() && v[j] instanceof String)
+                                                           {
+                                                               v[j] = "";
+                                                               return;
+                                                           }
+                                                       }
+                                                       clusterings[i] = v;
+                                                   }
+                                                   
ps.overrideClusterings(clusterings);
+                                               })
+                               .insert(rng.nextInt(maxPartitionSize));
+                    })
+                    .step((history) -> {
+                        history.visitPartition(rng.nextInt(partitions))
+                               .deleteRow(rng.nextInt(maxPartitionSize));
+                    })
+                    .exitCondition((history) -> {
+                        if (history.size() < 10_000)
+                            return false;
+
+                        ReplayingVisitor visitor = history.visitor(tracker, 
sut, SystemUnderTest.ConsistencyLevel.NODE_LOCAL);
+                        visitor.replayAll();
+
+                        cluster.get(1).nodetool("repair", "--full");
+
+                        Model model = history.quiescentLocalChecker(tracker, 
sut);
+
+                        for (Long pd : history.visitedPds())
+                            
model.validate(Query.selectPartition(history.schema(), pd, false));
+
+                        return true;
+                    })
+                    .run(Integer.MAX_VALUE, seed);
+    }
+}
\ No newline at end of file
diff --git 
a/test/distributed/org/apache/cassandra/fuzz/harry/integration/dsl/HistoryBuilderIntegrationTest.java
 
b/test/distributed/org/apache/cassandra/fuzz/harry/integration/dsl/HistoryBuilderIntegrationTest.java
index bf5cfde121..e64835ac6f 100644
--- 
a/test/distributed/org/apache/cassandra/fuzz/harry/integration/dsl/HistoryBuilderIntegrationTest.java
+++ 
b/test/distributed/org/apache/cassandra/fuzz/harry/integration/dsl/HistoryBuilderIntegrationTest.java
@@ -19,46 +19,31 @@
 package org.apache.cassandra.fuzz.harry.integration.dsl;
 
 import java.util.Random;
-import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 
 import org.junit.Test;
 
-import org.apache.cassandra.fuzz.harry.integration.model.ModelTestBase;
+import org.apache.cassandra.fuzz.harry.integration.model.IntegrationTestBase;
 import org.apache.cassandra.harry.checker.ModelChecker;
-import org.apache.cassandra.harry.core.Configuration;
-import org.apache.cassandra.harry.core.Run;
 import org.apache.cassandra.harry.ddl.SchemaGenerators;
 import org.apache.cassandra.harry.ddl.SchemaSpec;
 import org.apache.cassandra.harry.dsl.BatchVisitBuilder;
 import org.apache.cassandra.harry.dsl.HistoryBuilder;
 import org.apache.cassandra.harry.dsl.SingleOperationBuilder;
-import org.apache.cassandra.harry.dsl.ValueDescriptorIndexGenerator;
 import org.apache.cassandra.harry.gen.rng.JdkRandomEntropySource;
 import org.apache.cassandra.harry.model.Model;
 import org.apache.cassandra.harry.operations.Query;
 import org.apache.cassandra.harry.sut.SystemUnderTest;
 import org.apache.cassandra.harry.sut.TokenPlacementModel;
+import org.apache.cassandra.harry.tracker.DataTracker;
+import org.apache.cassandra.harry.tracker.DefaultDataTracker;
 import org.apache.cassandra.harry.visitors.ReplayingVisitor;
 
-public class HistoryBuilderIntegrationTest extends ModelTestBase
+public class HistoryBuilderIntegrationTest extends IntegrationTestBase
 {
-    private final long seed = 1L;
-    private final int STEPS_PER_ITERATION = 1_000;
-    private final int MAX_PARTITIONS = 100;
-
-    @Override
-    protected Configuration.ModelConfiguration modelConfiguration()
-    {
-        return new Configuration.QuiescentCheckerConfig();
-    }
-
-    public Configuration.ConfigurationBuilder configuration(long seed, 
SchemaSpec schema)
-    {
-        return super.configuration(seed, schema)
-                    .setPartitionDescriptorSelector(new 
Configuration.DefaultPDSelectorConfiguration(2, 2))
-                    .setClusteringDescriptorSelector((builder) -> 
builder.setOperationsPerLtsDistribution(new 
Configuration.ConstantDistributionConfig(100_000)));
-    }
+    private static final long SEED = 1L;
+    private static final int STEPS_PER_ITERATION = 1_000;
+    private static final int MAX_PARTITIONS = 50;
 
     @Test
     public void simpleDSLTest() throws Throwable
@@ -67,46 +52,36 @@ public class HistoryBuilderIntegrationTest extends 
ModelTestBase
         for (int i = 0; i < SchemaGenerators.DEFAULT_RUNS; i++)
         {
             SchemaSpec schema = supplier.get();
-            Configuration config = configuration(i, schema).build();
-
-            Run run = config.createRun();
+            DataTracker tracker = new DefaultDataTracker();
             beforeEach();
-            run.sut.schemaChange(schema.compile().cql());
+            sut.schemaChange(schema.compile().cql());
 
             ModelChecker<SingleOperationBuilder> modelChecker = new 
ModelChecker<>();
-            JdkRandomEntropySource entropySource = new 
JdkRandomEntropySource(new Random(seed));
-
-            LongSupplier[] valueGenerators = new 
LongSupplier[run.schemaSpec.regularColumns.size()];
-            for (int j = 0; j < valueGenerators.length; j++)
-            {
-                valueGenerators[j] = new 
ValueDescriptorIndexGenerator(run.schemaSpec.regularColumns.get(j),
-                                                                       run.rng)
-                                     .toSupplier(entropySource.derive(), 20, 
0.2f);
-            }
+            JdkRandomEntropySource rng = new JdkRandomEntropySource(new 
Random(SEED));
 
             TokenPlacementModel.ReplicationFactor rf = new 
TokenPlacementModel.SimpleReplicationFactor(1);
 
             int maxPartitionSize = 100;
-            modelChecker.init(new HistoryBuilder(seed, maxPartitionSize, 10, 
schema, rf))
+            modelChecker.init(new HistoryBuilder(SEED, maxPartitionSize, 10, 
schema, rf))
                         .step((history) -> {
                             return history.insert();
                         })
                         .step((history) -> {
-                            return 
history.insert(entropySource.nextInt(maxPartitionSize));
+                            return 
history.insert(rng.nextInt(maxPartitionSize));
                         })
                         .step((history) -> {
-                            int row = entropySource.nextInt(maxPartitionSize);
-                            long[] vds = new long[valueGenerators.length];
-                            for (int j = 0; j < valueGenerators.length; j++)
-                                vds[j] = valueGenerators[j].getAsLong();
+                            int row = rng.nextInt(maxPartitionSize);
+                            long[] vIdxs = new 
long[schema.regularColumns.size()];
+                            for (int j = 0; j < schema.regularColumns.size(); 
j++)
+                                vIdxs[j] = rng.nextInt(20);
 
-                            return history.insert(row, vds);
+                            return history.insert(row, vIdxs);
                         })
                         .step((history) -> {
                             return history.deleteRow();
                         })
                         .step((history) -> {
-                            return 
history.deleteRow(entropySource.nextInt(maxPartitionSize));
+                            return 
history.deleteRow(rng.nextInt(maxPartitionSize));
                         })
                         .step(SingleOperationBuilder::deletePartition)
                         .step(SingleOperationBuilder::deleteColumns)
@@ -115,10 +90,10 @@ public class HistoryBuilderIntegrationTest extends 
ModelTestBase
                             return history.deleteRowRange();
                         })
                         .step((history) -> {
-                            return 
history.deleteRowRange(entropySource.nextInt(maxPartitionSize),
-                                                          
entropySource.nextInt(maxPartitionSize),
-                                                          
entropySource.nextBoolean(),
-                                                          
entropySource.nextBoolean());
+                            return 
history.deleteRowRange(rng.nextInt(maxPartitionSize),
+                                                          
rng.nextInt(maxPartitionSize),
+                                                          rng.nextBoolean(),
+                                                          rng.nextBoolean());
                         })
                         .step((history) -> history instanceof HistoryBuilder,
                               (history) -> ((HistoryBuilder) 
history).beginBatch())
@@ -129,20 +104,92 @@ public class HistoryBuilderIntegrationTest extends 
ModelTestBase
                                 return false;
 
                             HistoryBuilder historyBuilder = (HistoryBuilder) 
history;
-                            ReplayingVisitor visitor = 
historyBuilder.visitor(run.tracker, run.sut, 
SystemUnderTest.ConsistencyLevel.ALL);
+                            ReplayingVisitor visitor = 
historyBuilder.visitor(tracker, sut, SystemUnderTest.ConsistencyLevel.ALL);
+                            visitor.replayAll();
+
+                            if (historyBuilder.visitedPds().size() < 
MAX_PARTITIONS)
+                                return false;
+
+                            Model model = 
historyBuilder.quiescentChecker(tracker, sut);
+
+                            for (Long pd : historyBuilder.visitedPds())
+                                
model.validate(Query.selectPartition(historyBuilder.schema(), pd,false));
+
+                            return true;
+                        })
+                        .run(STEPS_PER_ITERATION, SEED);
+        }
+    }
+
+    @Test
+    public void overrideCkTest() throws Throwable
+    {
+        Supplier<SchemaSpec> supplier = 
SchemaGenerators.progression(SchemaGenerators.DEFAULT_SWITCH_AFTER);
+        for (int schemaIdx = 0; schemaIdx < SchemaGenerators.DEFAULT_RUNS; 
schemaIdx++)
+        {
+            SchemaSpec schema = supplier.get();
+            DataTracker tracker = new DefaultDataTracker();
+            beforeEach();
+            sut.schemaChange(schema.compile().cql());
+
+            ModelChecker<HistoryBuilder> modelChecker = new ModelChecker<>();
+            JdkRandomEntropySource rng = new JdkRandomEntropySource(new 
Random(SEED));
+
+            TokenPlacementModel.ReplicationFactor rf = new 
TokenPlacementModel.SimpleReplicationFactor(1);
+
+            int maxPartitionSize = 10;
+            modelChecker.init(new HistoryBuilder(SEED, maxPartitionSize, 10, 
schema, rf))
+                        .beforeAll((history) -> {
+                            for (int i = 0; i < MAX_PARTITIONS; i++)
+                                
history.forPartition(i).ensureClustering(schema.ckGenerator.inflate(rng.nextLong()));
+                        })
+                        .step((history) -> {
+                            history.visitPartition(rng.nextInt(MAX_PARTITIONS))
+                                   .insert();
+                        })
+                        .step((history) -> {
+                            history.visitPartition(rng.nextInt(MAX_PARTITIONS))
+                                   .insert(rng.nextInt(maxPartitionSize));
+                        })
+                        .step((history) -> {
+                            history.visitPartition(rng.nextInt(MAX_PARTITIONS))
+                                   .deleteRow();
+                        })
+                        .step((history) -> {
+                            history.visitPartition(rng.nextInt(MAX_PARTITIONS))
+                                   .deleteRow(rng.nextInt(maxPartitionSize));
+                        })
+                        .step((history) -> {
+                            history.visitPartition(rng.nextInt(MAX_PARTITIONS))
+                                   .deletePartition();
+                        })
+                        .step((history) -> {
+                            history.visitPartition(rng.nextInt(MAX_PARTITIONS))
+                                   .deleteColumns();
+                        })
+                        .step((history) -> {
+                            history.visitPartition(rng.nextInt(MAX_PARTITIONS))
+                                   .deleteRowRange();
+                        })
+                        .step((history) -> {
+                            history.visitPartition(rng.nextInt(MAX_PARTITIONS))
+                                   .deleteRowSlice();
+                        })
+                        .exitCondition((history) -> {
+                            ReplayingVisitor visitor = 
history.visitor(tracker, sut, SystemUnderTest.ConsistencyLevel.ALL);
                             visitor.replayAll();
 
-                            if (historyBuilder.presetSelector.pds().size() < 
MAX_PARTITIONS)
+                            if (history.visitedPds().size() < MAX_PARTITIONS)
                                 return false;
 
-                            Model model = 
historyBuilder.quiescentChecker(run.tracker, sut);
+                            Model model = history.quiescentChecker(tracker, 
sut);
 
-                            for (Long pd : historyBuilder.presetSelector.pds())
-                                
model.validate(Query.selectPartition(run.schemaSpec, pd,false));
+                            for (Long pd : history.visitedPds())
+                                
model.validate(Query.selectPartition(history.schema(), pd,false));
 
                             return true;
                         })
-                        .run(STEPS_PER_ITERATION, seed);
+                        .run(STEPS_PER_ITERATION, SEED);
         }
     }
 }
\ No newline at end of file
diff --git 
a/test/distributed/org/apache/cassandra/fuzz/harry/integration/dsl/HistoryBuilderOverridesIntegrationTest.java
 
b/test/distributed/org/apache/cassandra/fuzz/harry/integration/dsl/HistoryBuilderOverridesIntegrationTest.java
new file mode 100644
index 0000000000..7625ac9f32
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/fuzz/harry/integration/dsl/HistoryBuilderOverridesIntegrationTest.java
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.fuzz.harry.integration.dsl;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.fuzz.harry.integration.model.IntegrationTestBase;
+import org.apache.cassandra.harry.ddl.ColumnSpec;
+import org.apache.cassandra.harry.ddl.SchemaGenerators;
+import org.apache.cassandra.harry.ddl.SchemaSpec;
+import org.apache.cassandra.harry.dsl.HistoryBuilder;
+import org.apache.cassandra.harry.gen.Bijections;
+import org.apache.cassandra.harry.gen.EntropySource;
+import org.apache.cassandra.harry.gen.rng.JdkRandomEntropySource;
+import org.apache.cassandra.harry.operations.Query;
+import org.apache.cassandra.harry.sut.SystemUnderTest;
+import org.apache.cassandra.harry.sut.TokenPlacementModel;
+import org.apache.cassandra.harry.tracker.DataTracker;
+import org.apache.cassandra.harry.tracker.DefaultDataTracker;
+import org.apache.cassandra.harry.visitors.ReplayingVisitor;
+
+public class HistoryBuilderOverridesIntegrationTest extends IntegrationTestBase
+{
+    private static final long SEED = 1L;
+
+    public static SchemaSpec SIMPLE_SCHEMA = new SchemaSpec("harry",
+                                                            "test_overrides",
+                                                            
Arrays.asList(ColumnSpec.pk("pk1", ColumnSpec.asciiType(4, 10)),
+                                                                          
ColumnSpec.pk("pk2", ColumnSpec.int64Type),
+                                                                          
ColumnSpec.pk("pk3", ColumnSpec.int64Type),
+                                                                          
ColumnSpec.pk("pk4", ColumnSpec.asciiType(2, 10))
+                                                            ),
+                                                            
Arrays.asList(ColumnSpec.ck("ck1", ColumnSpec.asciiType(2, 0)),
+                                                                          
ColumnSpec.ck("ck2", ColumnSpec.asciiType(2, 0)),
+                                                                          
ColumnSpec.ck("ck3", ColumnSpec.int64Type),
+                                                                          
ColumnSpec.ck("ck4", ColumnSpec.asciiType(4, 100)),
+                                                                          
ColumnSpec.ck("ck5", ColumnSpec.asciiType(8, 100))
+                                                            ),
+                                                            
Arrays.asList(ColumnSpec.regularColumn("regular1", ColumnSpec.asciiType(8, 
100)),
+                                                                          
ColumnSpec.regularColumn("regular2", ColumnSpec.asciiType(8, 100)),
+                                                                          
ColumnSpec.regularColumn("regular3", ColumnSpec.asciiType(8, 100))
+                                                            ),
+                                                            
Arrays.asList(ColumnSpec.staticColumn("static1", ColumnSpec.asciiType(8, 100)),
+                                                                          
ColumnSpec.staticColumn("static2", ColumnSpec.asciiType(8, 100)),
+                                                                          
ColumnSpec.staticColumn("static3", ColumnSpec.asciiType(8, 100))
+                                                            ));
+    @Test
+    public void simpleCkOverrideTest()
+    {
+        SchemaSpec schema = SIMPLE_SCHEMA;
+
+        DataTracker tracker = new DefaultDataTracker();
+        beforeEach();
+        sut.schemaChange(schema.compile().cql());
+
+        TokenPlacementModel.ReplicationFactor rf = new 
TokenPlacementModel.SimpleReplicationFactor(1);
+
+        HistoryBuilder history = new HistoryBuilder(SEED, 5, 10, schema, rf);
+        Object[] override = new Object[]{ "", "b", -1L, "c", "d" };
+        history.forPartition(1).ensureClustering(override);
+        for (int i = 0; i < 5; i++)
+            history.visitPartition(1).insert(i);
+
+        history.visitor(tracker, sut, 
SystemUnderTest.ConsistencyLevel.ALL).replayAll();
+
+        Object[][] res = sut.execute(Query.selectPartition(history.schema(), 
history.visitedPds().get(0), false).toSelectStatement(),
+                                     SystemUnderTest.ConsistencyLevel.ALL);
+        int found = 0;
+        for (Object[] row : res)
+        {
+            if (Arrays.equals(override, Arrays.copyOfRange(row, 4, 9)))
+                found++;
+        }
+        Assert.assertEquals("Should have mutated exactly one CK", found, 1);
+
+        history.validateAll(tracker, sut);
+    }
+
+    @Test
+    public void ckOverrideSortingTest()
+    {
+        for (boolean reverse : new boolean[]{ true, false })
+        {
+            SchemaSpec schema = new SchemaSpec("harry",
+                                               "test_overrides" + (reverse ? 
"_reverse" : ""),
+                                               
Arrays.asList(ColumnSpec.pk("pk1", ColumnSpec.asciiType(4, 10))),
+                                               
Arrays.asList(ColumnSpec.ck("ck1", ColumnSpec.asciiType(2, 0), reverse)),
+                                               
Arrays.asList(ColumnSpec.regularColumn("regular1", ColumnSpec.asciiType(8, 
100))),
+                                               
Arrays.asList(ColumnSpec.staticColumn("static1", ColumnSpec.asciiType(8, 
100))));
+
+            DataTracker tracker = new DefaultDataTracker();
+            beforeEach();
+            sut.schemaChange(schema.compile().cql());
+
+            TokenPlacementModel.ReplicationFactor rf = new 
TokenPlacementModel.SimpleReplicationFactor(1);
+
+            int partitionSize = 10;
+            HistoryBuilder history = new HistoryBuilder(SEED, partitionSize, 
10, schema, rf);
+            ReplayingVisitor visitor = history.visitor(tracker, sut, 
SystemUnderTest.ConsistencyLevel.ALL);
+            Set<Integer> foundAt = new HashSet<>();
+            for (int pdIdx = 0; pdIdx < 128; pdIdx++)
+            {
+                Object[] override = new Object[]{ Character.toString(pdIdx) };
+                history.forPartition(pdIdx).ensureClustering(override);
+                for (int i = 0; i < partitionSize; i++)
+                    history.visitPartition(pdIdx).insert(i);
+
+                visitor.replayAll();
+                long visitedPd = history.forPartition(pdIdx).pd();
+                {
+                    Object[][] res = 
sut.execute(Query.selectPartition(history.schema(), visitedPd, 
false).toSelectStatement(),
+                                                 
SystemUnderTest.ConsistencyLevel.ALL);
+
+                    int found = 0;
+                    for (int i = 0; i < res.length; i++)
+                    {
+                        Object[] row = res[i];
+                        if (Arrays.equals(override, Arrays.copyOfRange(row, 1, 
2)))
+                        {
+                            found++;
+                            foundAt.add(i);
+                        }
+                    }
+                    Assert.assertEquals("Should have mutated exactly one CK", 
found, 1);
+                }
+                history.validateAll(tracker, sut);
+            }
+            Assert.assertEquals(10, foundAt.size());
+        }
+    }
+
+    @Test
+    public void ckOverrideManySortingTest()
+    {
+        int counter = 0;
+        for (boolean reverse : new boolean[]{ true, false })
+        {
+            for (ColumnSpec.DataType type : new ColumnSpec.DataType[]{ 
ColumnSpec.asciiType(2, 0), ColumnSpec.int64Type })
+            {
+                SchemaSpec schema = new SchemaSpec("harry",
+                                                   "test_overrides" + 
(counter++),
+                                                   
Arrays.asList(ColumnSpec.pk("pk1", ColumnSpec.asciiType(4, 10))),
+                                                   
Arrays.asList(ColumnSpec.ck("ck1", type, reverse)),
+                                                   
Arrays.asList(ColumnSpec.regularColumn("regular1", ColumnSpec.asciiType(8, 
100))),
+                                                   
Arrays.asList(ColumnSpec.staticColumn("static1", ColumnSpec.asciiType(8, 
100))));
+
+                DataTracker tracker = new DefaultDataTracker();
+                beforeEach();
+                sut.schemaChange(schema.compile().cql());
+
+                TokenPlacementModel.ReplicationFactor rf = new 
TokenPlacementModel.SimpleReplicationFactor(1);
+
+                int partitionSize = 10;
+                HistoryBuilder history = new HistoryBuilder(SEED, 
partitionSize, 10, schema, rf);
+                ReplayingVisitor visitor = history.visitor(tracker, sut, 
SystemUnderTest.ConsistencyLevel.ALL);
+                EntropySource rng = new JdkRandomEntropySource(SEED);
+                for (int pdIdx = 0; pdIdx < 100; pdIdx++)
+                {
+                    Set<Object> overrides = new HashSet<>();
+                    for (int i = 0; i < 5; i++)
+                    {
+                        Object override = 
schema.clusteringKeys.get(0).generator().inflate(rng.next());
+                        try
+                        {
+                            history.forPartition(pdIdx).ensureClustering(new 
Object[]{ override });
+                            overrides.add(override);
+                        }
+                        catch (IllegalStateException t)
+                        {
+                            // could not override twice
+                        }
+                    }
+
+                    for (int i = 0; i < partitionSize; i++)
+                        history.visitPartition(pdIdx).insert(i);
+
+                    visitor.replayAll();
+                    long visitedPd = history.forPartition(pdIdx).pd();
+                    {
+                        Object[][] res = 
sut.execute(Query.selectPartition(history.schema(), visitedPd, 
false).toSelectStatement(),
+                                                     
SystemUnderTest.ConsistencyLevel.ALL);
+
+                        int found = 0;
+                        for (int i = 0; i < res.length; i++)
+                        {
+                            Object[] row = res[i];
+                            Object v = row[1];
+                            if (overrides.contains(v))
+                                found++;
+                        }
+                        Assert.assertEquals("Should have mutated exactly one 
CK", found, overrides.size());
+                    }
+                    history.validateAll(tracker, sut);
+                }
+            }
+        }
+    }
+
+    @Test
+    public void ckOverrideWithDeleteTestSingleColumn()
+    {
+        Supplier<SchemaSpec> supplier = 
SchemaGenerators.progression(SchemaGenerators.DEFAULT_SWITCH_AFTER);
+
+        int partitionSize = 5;
+        for (int cnt = 0; cnt < SchemaGenerators.DEFAULT_RUNS; cnt++)
+        {
+            SchemaSpec schema = supplier.get();
+            beforeEach();
+
+            DataTracker tracker = new DefaultDataTracker();
+            beforeEach();
+            sut.schemaChange(schema.compile().cql());
+
+            TokenPlacementModel.ReplicationFactor rf = new 
TokenPlacementModel.SimpleReplicationFactor(1);
+
+            HistoryBuilder history = new HistoryBuilder(SEED, partitionSize, 
1, schema, rf);
+            ReplayingVisitor visitor = history.visitor(tracker, sut, 
SystemUnderTest.ConsistencyLevel.ALL);
+
+            EntropySource rng = new JdkRandomEntropySource(SEED);
+            for (int i = 0; i < partitionSize; i++)
+            {
+                history.visitPartition(1,
+                                       (e) -> 
schema.ckGenerator.inflate(rng.next()))
+                       .insert(i);
+            }
+
+            for (int i = 0; i < partitionSize; i++)
+            {
+                history.visitPartition(1)
+                       .deleteRow(i);
+
+                visitor.replayAll();
+                history.validateAll(tracker, sut);
+            }
+        }
+    }
+
+    @Test
+    public void regularAndStaticOverrideTest()
+    {
+        for (ColumnSpec.DataType<?> type : new ColumnSpec.DataType[]{ 
ColumnSpec.asciiType(2, 0), ColumnSpec.int64Type })
+        {
+            SchemaSpec schema = new SchemaSpec("harry",
+                                               "test_overrides",
+                                               
Arrays.asList(ColumnSpec.pk("pk1", ColumnSpec.asciiType(4, 10))),
+                                               
Arrays.asList(ColumnSpec.ck("ck1", type, false)),
+                                               
Arrays.asList(ColumnSpec.regularColumn("regular1", ColumnSpec.asciiType(2, 2)),
+                                                             
ColumnSpec.regularColumn("regular2", ColumnSpec.int64Type)),
+                                               
Arrays.asList(ColumnSpec.staticColumn("static1", ColumnSpec.asciiType(2, 2)),
+                                                             
ColumnSpec.staticColumn("static2", ColumnSpec.int64Type)
+                                               ));
+
+            Map<String, Bijections.Bijection<?>> reGenerators = new 
HashMap<>();
+            reGenerators.put("regular1", ColumnSpec.asciiType(4, 
4).generator());
+            reGenerators.put("regular2", ColumnSpec.int64Type.generator());
+            reGenerators.put("static1", ColumnSpec.asciiType(8, 
4).generator());
+            reGenerators.put("static2", ColumnSpec.int64Type.generator());
+
+            DataTracker tracker = new DefaultDataTracker();
+            beforeEach();
+            sut.schemaChange(schema.compile().cql());
+
+            TokenPlacementModel.ReplicationFactor rf = new 
TokenPlacementModel.SimpleReplicationFactor(1);
+
+            int partitionSize = 100;
+            HistoryBuilder history = new HistoryBuilder(SEED, partitionSize, 
10, schema, rf);
+            ReplayingVisitor visitor = history.visitor(tracker, sut, 
SystemUnderTest.ConsistencyLevel.ALL);
+            EntropySource rng = new JdkRandomEntropySource(SEED);
+
+            Map<String, Set<Object>> perColumnOverrides = new HashMap<>();
+            for (ColumnSpec<?> column : schema.regularColumns)
+            {
+                perColumnOverrides.put(column.name, new HashSet<>());
+                for (int i = 0; i < partitionSize; i++)
+                {
+                    Object override = 
reGenerators.get(column.name).inflate(rng.next());
+                    history.valueOverrides().override(column.name, i, 
override);
+                    perColumnOverrides.get(column.name).add(override);
+                }
+            }
+
+            for (ColumnSpec<?> column : schema.staticColumns)
+            {
+                perColumnOverrides.put(column.name, new HashSet<>());
+                for (int i = 0; i < partitionSize; i++)
+                {
+                    Object override = 
reGenerators.get(column.name).inflate(rng.next());
+                    history.valueOverrides().override(column.name, i, 
override);
+                    perColumnOverrides.get(column.name).add(override);
+                }
+            }
+            for (int pdIdx = 0; pdIdx < 10; pdIdx++)
+            {
+                Map<String, Set<Object>> results = new HashMap<>();
+                for (ColumnSpec<?> column : schema.regularColumns)
+                    results.put(column.name, new HashSet<>());
+                for (ColumnSpec<?> column : schema.staticColumns)
+                    results.put(column.name, new HashSet<>());
+
+                for (int i = 0; i < partitionSize; i++)
+                {
+                    history.visitPartition(pdIdx)
+                           .insert(i,
+                                   new long[]{ rng.nextInt(100), 
rng.nextInt(100) },
+                                   new long[]{ rng.nextInt(100), 
rng.nextInt(100) });
+                }
+
+                visitor.replayAll();
+                history.validateAll(tracker, sut);
+
+                long visitedPd = history.forPartition(pdIdx).pd();
+                Object[][] res = 
sut.execute(Query.selectPartition(history.schema(), visitedPd, 
false).toSelectStatement(),
+                                             
SystemUnderTest.ConsistencyLevel.ALL);
+
+                for (int i = 0; i < res.length; i++)
+                {
+                    Object[] row = res[i];
+                    results.get("regular1").add(row[4]);
+                    results.get("regular2").add(row[5]);
+                    results.get("static1").add(row[2]);
+                    results.get("static2").add(row[3]);
+                }
+
+                for (Map.Entry<String, Set<Object>> e : results.entrySet())
+                {
+                    for (Object o : e.getValue())
+                    {
+                        Assert.assertTrue(String.format("Found a non-overriden 
value for %s: %s", e.getKey(), e.getValue()),
+                                          
perColumnOverrides.get(e.getKey()).contains(o));
+                    }
+                }
+            }
+        }
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/fuzz/harry/integration/model/IntegrationTestBase.java
 
b/test/distributed/org/apache/cassandra/fuzz/harry/integration/model/IntegrationTestBase.java
index fd410cef73..7a6a926bed 100644
--- 
a/test/distributed/org/apache/cassandra/fuzz/harry/integration/model/IntegrationTestBase.java
+++ 
b/test/distributed/org/apache/cassandra/fuzz/harry/integration/model/IntegrationTestBase.java
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.fuzz.harry.integration.model;
 
+import java.util.function.Consumer;
 import java.util.function.Supplier;
 
 import org.junit.AfterClass;
@@ -27,6 +28,7 @@ import org.junit.BeforeClass;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.distributed.api.IInstanceConfig;
 import org.apache.cassandra.harry.core.Configuration;
 import org.apache.cassandra.harry.ddl.SchemaGenerators;
 import org.apache.cassandra.harry.ddl.SchemaSpec;
@@ -45,10 +47,15 @@ public class IntegrationTestBase extends TestBaseImpl
 
     @BeforeClass
     public static void before() throws Throwable
+    {
+        init(1, InJvmSutBase.defaultConfig());
+    }
+
+    protected static void init(int nodes, Consumer<IInstanceConfig> cfg) 
throws Throwable
     {
         cluster = Cluster.build()
-                         .withNodes(1)
-                         .withConfig(InJvmSutBase.defaultConfig())
+                         .withNodes(nodes)
+                         .withConfig(cfg)
                          .createWithoutStarting();
         cluster.setUncaughtExceptionsFilter(t -> {
             logger.error("Caught exception, reporting during shutdown. 
Ignoring.", t);
@@ -58,7 +65,6 @@ public class IntegrationTestBase extends TestBaseImpl
         cluster = init(cluster);
         sut = new InJvmSut(cluster);
     }
-
     @AfterClass
     public static void afterClass()
     {
diff --git 
a/test/distributed/org/apache/cassandra/fuzz/harry/integration/model/ReconcilerIntegrationTest.java
 
b/test/distributed/org/apache/cassandra/fuzz/harry/integration/model/ReconcilerIntegrationTest.java
index fecc321bbb..54fe5fe8a6 100644
--- 
a/test/distributed/org/apache/cassandra/fuzz/harry/integration/model/ReconcilerIntegrationTest.java
+++ 
b/test/distributed/org/apache/cassandra/fuzz/harry/integration/model/ReconcilerIntegrationTest.java
@@ -25,15 +25,14 @@ import org.junit.Test;
 import org.apache.cassandra.harry.ddl.ColumnSpec;
 import org.apache.cassandra.harry.ddl.SchemaSpec;
 import org.apache.cassandra.harry.dsl.ReplayingHistoryBuilder;
-import org.apache.cassandra.harry.gen.DataGenerators;
+import org.apache.cassandra.harry.dsl.ValueDescriptorIndexGenerator;
 import org.apache.cassandra.harry.sut.SystemUnderTest;
 import org.apache.cassandra.harry.sut.TokenPlacementModel;
 import org.apache.cassandra.harry.tracker.DefaultDataTracker;
 
 public class ReconcilerIntegrationTest extends IntegrationTestBase
 {
-    private final long seed = 1; // 88
-
+    private static final long SEED = 1; // 88
 
     @Test
     public void testTrackingWithStatics() throws Throwable
@@ -51,15 +50,15 @@ public class ReconcilerIntegrationTest extends 
IntegrationTestBase
         sut.schemaChange(schema.compile().cql());
 
         TokenPlacementModel.ReplicationFactor rf = new 
TokenPlacementModel.SimpleReplicationFactor(1);
-        ReplayingHistoryBuilder historyBuilder = new 
ReplayingHistoryBuilder(seed, 100, 1, new DefaultDataTracker(), sut, schema, 
rf, SystemUnderTest.ConsistencyLevel.QUORUM);
+        ReplayingHistoryBuilder historyBuilder = new 
ReplayingHistoryBuilder(SEED, 100, 1, new DefaultDataTracker(), sut, schema, 
rf, SystemUnderTest.ConsistencyLevel.QUORUM);
         historyBuilder.visitPartition(1).insert(1,
-                                                new long[]{ 
DataGenerators.UNSET_DESCR, DataGenerators.UNSET_DESCR },
-                                                new long[]{ 1L, 1L });
+                                                new long[]{ 
ValueDescriptorIndexGenerator.UNSET, ValueDescriptorIndexGenerator.UNSET },
+                                                new long[]{ 1, 1 });
         historyBuilder.validate(1);
 
         historyBuilder.visitPartition(2).insert(2,
-                                                new long[]{ 1L, 1L },
-                                                new long[]{ 1L, 1L });
+                                                new long[]{ 1, 1 },
+                                                new long[]{ 1, 1 });
         historyBuilder.visitPartition(2).deleteRowRange(1, 3, true, true);
         historyBuilder.validate(2);
     }
@@ -79,15 +78,15 @@ public class ReconcilerIntegrationTest extends 
IntegrationTestBase
         sut.schemaChange(schema.compile().cql());
 
         TokenPlacementModel.ReplicationFactor rf = new 
TokenPlacementModel.SimpleReplicationFactor(1);
-        ReplayingHistoryBuilder historyBuilder = new 
ReplayingHistoryBuilder(seed, 100, 1, new DefaultDataTracker(), sut, schema, 
rf, SystemUnderTest.ConsistencyLevel.QUORUM);
+        ReplayingHistoryBuilder historyBuilder = new 
ReplayingHistoryBuilder(SEED, 100, 1, new DefaultDataTracker(), sut, schema, 
rf, SystemUnderTest.ConsistencyLevel.QUORUM);
                                                                              
historyBuilder.visitPartition(2).insert(2,
-                                                new long[]{ 1L, 1L });
+                                                new long[]{ 1, 1 });
         historyBuilder.visitPartition(2).deleteRowRange(1, 3, true, true);
         historyBuilder.validate(2);
 
-        historyBuilder = new ReplayingHistoryBuilder(seed, 100, 1, new 
DefaultDataTracker(), sut, schema, rf, SystemUnderTest.ConsistencyLevel.QUORUM);
+        historyBuilder = new ReplayingHistoryBuilder(SEED, 100, 1, new 
DefaultDataTracker(), sut, schema, rf, SystemUnderTest.ConsistencyLevel.QUORUM);
         historyBuilder.visitPartition(2).insert(2,
-                                                new long[]{ 1L, 1L });
+                                                new long[]{ 1, 1 });
         historyBuilder.visitPartition(2).deleteRowRange(1, 3, true, true);
         historyBuilder.validate(2);
     }
diff --git 
a/test/distributed/org/apache/cassandra/fuzz/ring/ConsistentBootstrapTest.java 
b/test/distributed/org/apache/cassandra/fuzz/ring/ConsistentBootstrapTest.java
index 592d88be51..f5f5ef0488 100644
--- 
a/test/distributed/org/apache/cassandra/fuzz/ring/ConsistentBootstrapTest.java
+++ 
b/test/distributed/org/apache/cassandra/fuzz/ring/ConsistentBootstrapTest.java
@@ -58,7 +58,9 @@ public class ConsistentBootstrapTest extends FuzzTestBase
         try (Cluster cluster = builder().withNodes(3)
                                         
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(4))
                                         
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(4, "dc0", "rack0"))
-                                        .withConfig((config) -> 
config.with(Feature.NETWORK, Feature.GOSSIP).set("metadata_snapshot_frequency", 
5))
+                                        .withConfig((config) -> 
config.with(Feature.NETWORK, Feature.GOSSIP)
+                                                                      
.set("write_request_timeout", "10s")
+                                                                      
.set("metadata_snapshot_frequency", 5))
                                         .start())
         {
             cmsInstance = cluster.get(1);
@@ -119,7 +121,9 @@ public class ConsistentBootstrapTest extends FuzzTestBase
         try (Cluster cluster = builder().withNodes(3)
                                         
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(4))
                                         
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(4, "dc0", "rack0"))
-                                        .withConfig((config) -> 
config.with(Feature.NETWORK, Feature.GOSSIP).set("metadata_snapshot_frequency", 
5))
+                                        .withConfig((config) -> 
config.with(Feature.NETWORK, Feature.GOSSIP)
+                                                                      
.set("write_request_timeout", "10s")
+                                                                      
.set("metadata_snapshot_frequency", 5))
                                         .start())
         {
             cmsInstance = cluster.get(1);
diff --git 
a/test/distributed/org/apache/cassandra/fuzz/sai/SingleNodeSAITest.java 
b/test/distributed/org/apache/cassandra/fuzz/sai/SingleNodeSAITest.java
index f0fdf6c819..09b215d710 100644
--- a/test/distributed/org/apache/cassandra/fuzz/sai/SingleNodeSAITest.java
+++ b/test/distributed/org/apache/cassandra/fuzz/sai/SingleNodeSAITest.java
@@ -75,11 +75,8 @@ public class SingleNodeSAITest extends IntegrationTestBase
                                            
Arrays.asList(ColumnSpec.regularColumn("v1", ColumnSpec.asciiType(40, 100)),
                                                          
ColumnSpec.regularColumn("v2", ColumnSpec.int64Type),
                                                          
ColumnSpec.regularColumn("v3", ColumnSpec.int64Type)),
-                                           
List.of(ColumnSpec.staticColumn("s1", ColumnSpec.asciiType(40, 100))),
-                                           false,
-                                           false,
-                                           "LeveledCompactionStrategy",
-                                           false);
+                                           
List.of(ColumnSpec.staticColumn("s1", ColumnSpec.asciiType(40, 100))))
+                            
.withCompactionStrategy("LeveledCompactionStrategy");
 
         sut.schemaChange(schema.compile().cql());
         sut.schemaChange(schema.cloneWithName(schema.keyspace, schema.table + 
"_debug").compile().cql());
@@ -218,9 +215,9 @@ public class SingleNodeSAITest extends IntegrationTestBase
                                                         
values[random.nextInt(values.length)]));
                     }
 
-                    long pd = 
history.presetSelector.pdAtPosition(partitionIndex);
+                    long pd = 
history.pdSelector().pdAtPosition(partitionIndex);
                     FilteringQuery query = new FilteringQuery(pd, false, 
relations, schema);
-                    Reconciler reconciler = new 
Reconciler(history.presetSelector, schema, history::visitor);
+                    Reconciler reconciler = new 
Reconciler(history.pdSelector(), schema, history::visitor);
                     Set<ColumnSpec<?>> columns = new 
HashSet<>(schema.allColumns);
 
                     PartitionState modelState = 
reconciler.inflatePartitionState(pd, tracker, query).filter(query);
diff --git 
a/test/distributed/org/apache/cassandra/fuzz/sai/StaticsTortureTest.java 
b/test/distributed/org/apache/cassandra/fuzz/sai/StaticsTortureTest.java
index 6c24c2fff9..e68c204d23 100644
--- a/test/distributed/org/apache/cassandra/fuzz/sai/StaticsTortureTest.java
+++ b/test/distributed/org/apache/cassandra/fuzz/sai/StaticsTortureTest.java
@@ -51,19 +51,20 @@ import 
org.apache.cassandra.harry.tracker.DefaultDataTracker;
 
 public class StaticsTortureTest extends IntegrationTestBase
 {
+    private static final long SEED = 1;
     private static final int MAX_PARTITION_SIZE = 10_000;
     private static final int NUM_PARTITIONS = 100;
     private static final int UNIQUE_CELL_VALUES = 5;
 
-    long seed = 1;
-
     @Test
     public void staticsTortureTest()
     {
         CassandraRelevantProperties.SAI_INTERSECTION_CLAUSE_LIMIT.setInt(6);
+        int idx = 0;
         staticsTortureTest(Arrays.asList(ColumnSpec.ck("ck1", 
ColumnSpec.asciiType(4, 100)),
                                          ColumnSpec.ck("ck2", 
ColumnSpec.asciiType),
-                                         ColumnSpec.ck("ck3", 
ColumnSpec.int64Type)));
+                                         ColumnSpec.ck("ck3", 
ColumnSpec.int64Type)),
+                           idx++);
 
         for (boolean b1 : new boolean[]{ true, false })
             for (boolean b2 : new boolean[]{ true, false })
@@ -71,13 +72,14 @@ public class StaticsTortureTest extends IntegrationTestBase
                 {
                     staticsTortureTest(Arrays.asList(ColumnSpec.ck("ck1", 
ColumnSpec.asciiType(4, 100), b1),
                                                      ColumnSpec.ck("ck2", 
ColumnSpec.asciiType, b2),
-                                                     ColumnSpec.ck("ck3", 
ColumnSpec.int64Type, b3)));
+                                                     ColumnSpec.ck("ck3", 
ColumnSpec.int64Type, b3)),
+                                       idx++);
                 }
     }
 
-    public void staticsTortureTest(List<ColumnSpec<?>> cks)
+    public void staticsTortureTest(List<ColumnSpec<?>> cks, int idx)
     {
-        SchemaSpec schema = new SchemaSpec(KEYSPACE, "tbl" + (seed++),
+        SchemaSpec schema = new SchemaSpec(KEYSPACE, "tbl" + idx,
                                            Arrays.asList(ColumnSpec.ck("pk1", 
ColumnSpec.int64Type),
                                                          ColumnSpec.ck("pk2", 
ColumnSpec.asciiType(4, 100)),
                                                          ColumnSpec.ck("pk3", 
ColumnSpec.int64Type)),
@@ -132,7 +134,7 @@ public class StaticsTortureTest extends IntegrationTestBase
                                        schema.staticColumns.get(2).name));
         DataTracker tracker = new DefaultDataTracker();
         TokenPlacementModel.ReplicationFactor rf = new 
TokenPlacementModel.SimpleReplicationFactor(cluster.size());
-        ReplayingHistoryBuilder history = new ReplayingHistoryBuilder(seed,
+        ReplayingHistoryBuilder history = new ReplayingHistoryBuilder(SEED + 
idx,
                                                                       
MAX_PARTITION_SIZE,
                                                                       100,
                                                                       tracker,
@@ -201,8 +203,8 @@ public class StaticsTortureTest extends IntegrationTestBase
 
         for (int pdx = 0; pdx < NUM_PARTITIONS; pdx++)
         {
-            long pd = history.presetSelector.pdAtPosition(pdx);
-            history.presetSelector.pdAtPosition(1);
+            long pd = history.pdSelector().pdAtPosition(pdx);
+            history.pdSelector().pdAtPosition(1);
             for (int i1 = 0; i1 < values.length; i1++)
                 for (int i2 = 0; i2 < values.length; i2++)
                     for (int i3 = 0; i3 < values.length; i3++)
diff --git 
a/test/harry/main/org/apache/cassandra/harry/checker/ModelChecker.java 
b/test/harry/main/org/apache/cassandra/harry/checker/ModelChecker.java
index 829075d871..74bf441000 100644
--- a/test/harry/main/org/apache/cassandra/harry/checker/ModelChecker.java
+++ b/test/harry/main/org/apache/cassandra/harry/checker/ModelChecker.java
@@ -69,6 +69,7 @@ public class ModelChecker<STATE>
         if (afterAll != null)
             afterAll.accept(state.get());
     }
+
     public ModelChecker<STATE> init(STATE state)
     {
         this.init = state;
diff --git a/test/harry/main/org/apache/cassandra/harry/ddl/ColumnSpec.java 
b/test/harry/main/org/apache/cassandra/harry/ddl/ColumnSpec.java
index 5d1109335b..88bd95d6a9 100644
--- a/test/harry/main/org/apache/cassandra/harry/ddl/ColumnSpec.java
+++ b/test/harry/main/org/apache/cassandra/harry/ddl/ColumnSpec.java
@@ -46,6 +46,33 @@ public class ColumnSpec<T>
         this.kind = kind;
     }
 
+
+    public ColumnSpec<T> override(Bijections.Bijection<T> override)
+    {
+        return new ColumnSpec<>(name,
+                                new DataType<>(type.cqlName) {
+                                    @Override
+                                    public int compareLexicographically(long 
l, long r)
+                                    {
+                                        return 
type.compareLexicographically(l, r);
+                                    }
+
+                                    @Override
+                                    public boolean isReversed()
+                                    {
+                                        return type.isReversed();
+                                    }
+
+                                    @Override
+                                    public Bijections.Bijection<T> generator()
+                                    {
+                                        return override;
+                                    }
+                                },
+                                kind);
+    }
+
+
     void setColumnIndex(int idx)
     {
         this.columnIndex = idx;
@@ -71,14 +98,14 @@ public class ColumnSpec<T>
         if (o == null || getClass() != o.getClass()) return false;
         ColumnSpec<?> that = (ColumnSpec<?>) o;
         return Objects.equals(name, that.name) &&
-               Objects.equals(type, that.type) &&
+               Objects.equals(type.cqlName, that.type.cqlName) &&
                kind == that.kind;
     }
 
     @Override
     public int hashCode()
     {
-        return Objects.hash(name, type, kind);
+        return Objects.hash(name, type.cqlName, kind);
     }
 
     public String name()
@@ -179,7 +206,7 @@ public class ColumnSpec<T>
             return generator().byteSize();
         }
 
-        public String toString()
+        public final String toString()
         {
             return cqlName;
         }
@@ -189,7 +216,7 @@ public class ColumnSpec<T>
             return cqlName;
         }
 
-        public boolean equals(Object o)
+        public final boolean equals(Object o)
         {
             if (this == o) return true;
             if (o == null || getClass() != o.getClass()) return false;
@@ -197,7 +224,7 @@ public class ColumnSpec<T>
             return Objects.equals(cqlName, dataType.cqlName);
         }
 
-        public int hashCode()
+        public final int hashCode()
         {
             return Objects.hash(cqlName);
         }
@@ -421,17 +448,12 @@ public class ColumnSpec<T>
             return baseType.maxSize();
         }
 
-        public DataType<T> baseType()
-        {
-            return baseType;
-        }
-
         public static <T> DataType<T> getInstance(DataType<T> type)
         {
             ReversedType<T> t = (ReversedType<T>) cache.get(type);
             if (t == null)
                 t = new ReversedType<>(type);
-            assert t.baseType == type : "Type mismatch";
+            assert t.baseType == type : String.format("Type mismatch %s != 
%s", t.baseType, type);
             return t;
         }
     }
diff --git a/test/harry/main/org/apache/cassandra/harry/ddl/SchemaSpec.java 
b/test/harry/main/org/apache/cassandra/harry/ddl/SchemaSpec.java
index 4ad7f29369..407059dfce 100644
--- a/test/harry/main/org/apache/cassandra/harry/ddl/SchemaSpec.java
+++ b/test/harry/main/org/apache/cassandra/harry/ddl/SchemaSpec.java
@@ -67,26 +67,38 @@ public class SchemaSpec
                       List<ColumnSpec<?>> regularColumns,
                       List<ColumnSpec<?>> staticColumns)
     {
-        this(keyspace, table, partitionKeys, clusteringKeys, regularColumns, 
staticColumns, false, false, null, false);
+        this(keyspace, table, partitionKeys, clusteringKeys, regularColumns, 
staticColumns, DataGenerators.createKeyGenerator(clusteringKeys), false, false, 
null, false);
     }
 
     public SchemaSpec cloneWithName(String ks,
                                     String table)
     {
-        return new SchemaSpec(ks, table, partitionKeys, clusteringKeys, 
regularColumns, staticColumns, 
-                              isCompactStorage, disableReadRepair, 
compactionStrategy, trackLts);
+        return new SchemaSpec(ks, table, partitionKeys, clusteringKeys, 
regularColumns, staticColumns, ckGenerator, isCompactStorage, 
disableReadRepair, compactionStrategy, trackLts);
     }
 
     public SchemaSpec trackLts()
     {
-        return new SchemaSpec(keyspace, table, partitionKeys, clusteringKeys, 
regularColumns, staticColumns, 
-                              isCompactStorage, true, compactionStrategy, 
disableReadRepair);
+        return new SchemaSpec(keyspace, table, partitionKeys, clusteringKeys, 
regularColumns, staticColumns, ckGenerator, isCompactStorage, 
disableReadRepair, compactionStrategy, true);
     }
 
     public SchemaSpec withCompactStorage()
     {
-        return new SchemaSpec(keyspace, table, partitionKeys, clusteringKeys, 
regularColumns, 
-                              staticColumns, true, disableReadRepair, 
compactionStrategy, trackLts);
+        return new SchemaSpec(keyspace, table, partitionKeys, clusteringKeys, 
regularColumns, staticColumns, ckGenerator, true, disableReadRepair, 
compactionStrategy, trackLts);
+    }
+
+    public SchemaSpec withCompactionStrategy(String compactionStrategy)
+    {
+        return new SchemaSpec(keyspace, table, partitionKeys, clusteringKeys, 
regularColumns, staticColumns, ckGenerator, false, disableReadRepair, 
compactionStrategy, trackLts);
+    }
+
+    public SchemaSpec withCkGenerator(DataGenerators.KeyGenerator 
ckGeneratorOverride, List<ColumnSpec<?>> clusteringKeys)
+    {
+        return new SchemaSpec(keyspace, table, partitionKeys, clusteringKeys, 
regularColumns, staticColumns, ckGeneratorOverride, isCompactStorage, 
disableReadRepair, compactionStrategy, trackLts);
+    }
+
+    public SchemaSpec withColumns(List<ColumnSpec<?>> regularColumns, 
List<ColumnSpec<?>> staticColumns)
+    {
+        return new SchemaSpec(keyspace, table, partitionKeys, clusteringKeys, 
regularColumns, staticColumns, ckGenerator, isCompactStorage, 
disableReadRepair, compactionStrategy, trackLts);
     }
 
     public SchemaSpec(String keyspace,
@@ -95,12 +107,14 @@ public class SchemaSpec
                       List<ColumnSpec<?>> clusteringKeys,
                       List<ColumnSpec<?>> regularColumns,
                       List<ColumnSpec<?>> staticColumns,
+                      DataGenerators.KeyGenerator ckGenerator,
                       boolean isCompactStorage,
                       boolean disableReadRepair,
                       String compactionStrategy,
                       boolean trackLts)
     {
-        assert !isCompactStorage || clusteringKeys.size() == 0 || 
regularColumns.size() <= 1;
+        assert !isCompactStorage || clusteringKeys.isEmpty() || 
regularColumns.size() <= 1 :
+        String.format("Compact storage %s. Clustering keys: %d. Regular 
columns: %d", isCompactStorage, clusteringKeys.size(), regularColumns.size());
 
         this.keyspace = keyspace;
         this.table = table;
@@ -133,7 +147,9 @@ public class SchemaSpec
         this.allColumnsSet = Collections.unmodifiableSet(new 
LinkedHashSet<>(all));
 
         this.pkGenerator = DataGenerators.createKeyGenerator(partitionKeys);
-        this.ckGenerator = DataGenerators.createKeyGenerator(clusteringKeys);
+        if (ckGenerator == null)
+            ckGenerator = DataGenerators.createKeyGenerator(clusteringKeys);
+        this.ckGenerator = ckGenerator;
 
         this.ALL_COLUMNS_BITSET = BitSet.allSet(regularColumns.size());
 
@@ -146,6 +162,8 @@ public class SchemaSpec
         this.trackLts = trackLts;
     }
 
+
+
     public static BitSet allColumnsMask(SchemaSpec schema)
     {
         return BitSet.allSet(schema.allColumns.size());
diff --git a/test/harry/main/org/apache/cassandra/harry/dsl/ArrayWrapper.java 
b/test/harry/main/org/apache/cassandra/harry/dsl/ArrayWrapper.java
new file mode 100644
index 0000000000..bddc9ade1f
--- /dev/null
+++ b/test/harry/main/org/apache/cassandra/harry/dsl/ArrayWrapper.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.harry.dsl;
+
+import java.util.Arrays;
+
+/**
+ * A small wrapper to allow object arrays to be used as keys in maps.
+ */
+class ArrayWrapper
+{
+    private final Object[] objects;
+
+    public ArrayWrapper(Object[] objects)
+    {
+        this.objects = objects;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        ArrayWrapper wrapper = (ArrayWrapper) o;
+        return Arrays.equals(objects, wrapper.objects);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Arrays.hashCode(objects);
+    }
+}
diff --git 
a/test/harry/main/org/apache/cassandra/harry/dsl/BatchVisitBuilder.java 
b/test/harry/main/org/apache/cassandra/harry/dsl/BatchVisitBuilder.java
index f85e09e721..7de5ccf31e 100644
--- a/test/harry/main/org/apache/cassandra/harry/dsl/BatchVisitBuilder.java
+++ b/test/harry/main/org/apache/cassandra/harry/dsl/BatchVisitBuilder.java
@@ -30,14 +30,15 @@ public class BatchVisitBuilder extends 
SingleOperationVisitBuilder implements Cl
     private final HistoryBuilder historyBuilder;
 
     public BatchVisitBuilder(HistoryBuilder historyBuilder,
-                             PartitionVisitState partitionState,
+                             PartitionVisitStateImpl partitionState,
                              long lts,
                              OpSelectors.PureRng rng,
                              OpSelectors.DescriptorSelector descriptorSelector,
-                             SchemaSpec schemaSpec,
+                             SchemaSpec schema,
+                             ValueHelper valueHelper,
                              Consumer<ReplayingVisitor.Visit> appendToLog)
     {
-        super(partitionState, lts, rng, descriptorSelector, schemaSpec, 
appendToLog);
+        super(partitionState, lts, rng, descriptorSelector, schema, 
valueHelper, appendToLog);
         this.historyBuilder = historyBuilder;
     }
 
@@ -70,9 +71,9 @@ public class BatchVisitBuilder extends 
SingleOperationVisitBuilder implements Cl
     }
 
     @Override
-    public BatchVisitBuilder insert(int rowIdx, long[] vds)
+    public BatchVisitBuilder insert(int rowIdx, long[] valueIdxs)
     {
-        super.insert(rowIdx, vds);
+        super.insert(rowIdx, valueIdxs);
         return this;
     }
 
diff --git a/test/harry/main/org/apache/cassandra/harry/dsl/HistoryBuilder.java 
b/test/harry/main/org/apache/cassandra/harry/dsl/HistoryBuilder.java
index a119ac63ea..a8503b9f98 100644
--- a/test/harry/main/org/apache/cassandra/harry/dsl/HistoryBuilder.java
+++ b/test/harry/main/org/apache/cassandra/harry/dsl/HistoryBuilder.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.harry.dsl;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -29,6 +28,7 @@ import java.util.Map;
 import java.util.NavigableSet;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.function.Consumer;
 import java.util.function.LongSupplier;
 
 import org.apache.cassandra.harry.clock.ApproximateClock;
@@ -38,6 +38,7 @@ import org.apache.cassandra.harry.ddl.SchemaSpec;
 import org.apache.cassandra.harry.gen.EntropySource;
 import org.apache.cassandra.harry.gen.rng.JdkRandomEntropySource;
 import org.apache.cassandra.harry.model.Model;
+import org.apache.cassandra.harry.model.NoOpChecker;
 import org.apache.cassandra.harry.model.OpSelectors;
 import org.apache.cassandra.harry.model.QuiescentChecker;
 import org.apache.cassandra.harry.model.reconciler.Reconciler;
@@ -97,25 +98,46 @@ import org.apache.cassandra.harry.visitors.VisitExecutor;
  * streams for the values for corresponding columns.
  *
  * Other possible operations are deleteRow, deleteColumns, deleteRowRange, 
deleteRowSlide, and deletePartition.
+ *
+ * HistoryBuilder also allows hardcoding/overriding clustering keys, regular, 
and static values, but _not_ for
+ * partition keys as of now.
+ *
+ * Since clusterings are ordered according to their value, it is only possible 
to instruct generator to ensure
+ * such value is going to be present. This is done by:
+ *
+ *     history.forPartition(1).ensureClustering(new Object[]{ "", "b", -1L, 
"c", "d" });
+ *
+ * For regular and static columns, overrides are done on the top level, not 
per-partition, so you can simply do:
+ *
+ *     history.valueOverrides().override(column.name, 1245, "overriden value");
+ *
+ *     history.visitPartition(1)
+ *            .insert(1, new long[]{ 12345, 12345 });
+ *
+ *  This will insert "overriden value" for the 1st row of 1st partition, for 
two columns. In other words, the index
+ *  12345 will now be associated with this overriden value. But all other / 
random values will still be, random.
  */
 public class HistoryBuilder implements Iterable<ReplayingVisitor.Visit>, 
SingleOperationBuilder, BatchOperationBuilder
 {
+    protected final OverridingCkGenerator ckGenerator;
+
     protected final SchemaSpec schema;
     protected final TokenPlacementModel.ReplicationFactor rf;
 
     protected final OpSelectors.PureRng pureRng;
     protected final OpSelectors.DescriptorSelector descriptorSelector;
-
+    protected final ValueHelper valueHelper;
     // TODO: would be great to have a very simple B-Tree here
     protected final Map<Long, ReplayingVisitor.Visit> log;
 
     // TODO: primitive array with a custom/noncopying growth strat
-    protected final Map<Long, PartitionVisitState> partitionStates = new 
HashMap<>();
+    protected final Map<Long, PartitionVisitStateImpl> partitionStates = new 
HashMap<>();
+    protected final Set<Long> visitedPartitions = new HashSet<>();
 
     /**
      * A selector that is going to be used by the model checker.
      */
-    public final PresetPdSelector presetSelector;
+    protected final PresetPdSelector presetSelector;
 
     /**
      * Default selector will select every partition exactly once.
@@ -134,7 +156,11 @@ public class HistoryBuilder implements 
Iterable<ReplayingVisitor.Visit>, SingleO
         this.log = new HashMap<>();
         this.pureRng = new OpSelectors.PCGFast(seed);
 
-        this.schema = schema;
+        this.presetSelector = new PresetPdSelector();
+        this.ckGenerator = OverridingCkGenerator.make(schema.ckGenerator);
+        this.valueHelper = new ValueHelper(schema, pureRng);
+        this.schema = schema.withCkGenerator(this.ckGenerator, 
this.ckGenerator.columns)
+                            .withColumns(valueHelper.regularColumns, 
valueHelper.staticColumns);
         this.rf = rf;
 
         // TODO: make clock pluggable
@@ -142,7 +168,6 @@ public class HistoryBuilder implements 
Iterable<ReplayingVisitor.Visit>, SingleO
                                      interleaveWindowSize,
                                      new JdkRandomEntropySource(seed));
 
-        this.presetSelector = new PresetPdSelector();
         this.defaultSelector = new OpSelectors.DefaultPdSelector(pureRng, 1, 
1);
 
         this.descriptorSelector = new 
Configuration.CDSelectorConfigurationBuilder()
@@ -152,6 +177,11 @@ public class HistoryBuilder implements 
Iterable<ReplayingVisitor.Visit>, SingleO
                                   .make(pureRng, schema);
     }
 
+    public ValueOverrides valueOverrides()
+    {
+        return valueHelper;
+    }
+
     public SchemaSpec schema()
     {
         return schema;
@@ -166,12 +196,13 @@ public class HistoryBuilder implements 
Iterable<ReplayingVisitor.Visit>, SingleO
     {
         return clock;
     }
+
     /**
      * Visited partition descriptors _not_ in the order they were visited
      */
     public List<Long> visitedPds()
     {
-        return new ArrayList<>(partitionStates.keySet());
+        return new ArrayList<>(visitedPartitions);
     }
 
     @Override
@@ -183,13 +214,14 @@ public class HistoryBuilder implements 
Iterable<ReplayingVisitor.Visit>, SingleO
     protected SingleOperationVisitBuilder singleOpVisitBuilder()
     {
         long visitLts = clock.nextLts();
-        return singleOpVisitBuilder(defaultSelector.pd(visitLts, schema), 
visitLts);
+        return singleOpVisitBuilder(defaultSelector.pd(visitLts, schema), 
visitLts, (ps) -> {});
     }
 
-    protected SingleOperationVisitBuilder singleOpVisitBuilder(long pd, long 
lts)
+    protected SingleOperationVisitBuilder singleOpVisitBuilder(long pd, long 
lts, Consumer<PartitionVisitState> setupPs)
     {
-        PartitionVisitState partitionState = presetSelector.register(lts, pd);
-        return new SingleOperationVisitBuilder(partitionState, lts, pureRng, 
descriptorSelector, schema, (visit) -> {
+        PartitionVisitStateImpl partitionState = presetSelector.register(lts, 
pd, setupPs);
+        return new SingleOperationVisitBuilder(partitionState, lts, pureRng, 
descriptorSelector, schema, valueHelper, (visit) -> {
+            visitedPartitions.add(pd);
             log.put(visit.lts, visit);
         });
     }
@@ -209,15 +241,15 @@ public class HistoryBuilder implements 
Iterable<ReplayingVisitor.Visit>, SingleO
     }
 
     @Override
-    public HistoryBuilder insert(int rowId, long[] vds)
+    public HistoryBuilder insert(int rowId, long[] valueIdxs)
     {
-        singleOpVisitBuilder().insert(rowId, vds);
+        singleOpVisitBuilder().insert(rowId, valueIdxs);
         return this;
     }
 
-    public SingleOperationBuilder insert(int rowIdx, long[] vds, long[] sds)
+    public SingleOperationBuilder insert(int rowIdx, long[] valueIdxs, long[] 
sValueIdxs)
     {
-        singleOpVisitBuilder().insert(rowIdx, vds, sds);
+        singleOpVisitBuilder().insert(rowIdx, valueIdxs, sValueIdxs);
         return this;
     }
 
@@ -292,8 +324,9 @@ public class HistoryBuilder implements 
Iterable<ReplayingVisitor.Visit>, SingleO
 
     protected BatchVisitBuilder batchVisitBuilder(long pd, long lts)
     {
-        PartitionVisitState partitionState = presetSelector.register(lts, pd);
-        return new BatchVisitBuilder(this, partitionState, lts, pureRng, 
descriptorSelector, schema, (visit) -> {
+        PartitionVisitStateImpl partitionState = presetSelector.register(lts, 
pd, (ps) -> {});
+        return new BatchVisitBuilder(this, partitionState, lts, pureRng, 
descriptorSelector, schema, valueHelper, (visit) -> {
+            visitedPartitions.add(pd);
             log.put(visit.lts, visit);
         });
     }
@@ -302,7 +335,43 @@ public class HistoryBuilder implements 
Iterable<ReplayingVisitor.Visit>, SingleO
     {
         long visitLts = clock.nextLts();
         long pd = presetSelector.pdAtPosition(pdIdx);
-        return singleOpVisitBuilder(pd, visitLts);
+        return singleOpVisitBuilder(pd, visitLts, (ps) -> {});
+    }
+
+    public SingleOperationBuilder visitPartition(long pdIdx, 
Consumer<PartitionVisitState> setupPs)
+    {
+        long visitLts = clock.nextLts();
+        long pd = presetSelector.pdAtPosition(pdIdx);
+        return singleOpVisitBuilder(pd, visitLts, setupPs);
+    }
+
+    public PartitionVisitState forPartition(long pdIdx)
+    {
+        long pd = defaultSelector.pdAtPosition(pdIdx, schema);
+        return partitionStates.computeIfAbsent(pd, (pd_) -> 
makePartitionVisitState(pd));
+    }
+
+    private PartitionVisitStateImpl makePartitionVisitState(long pd)
+    {
+        Long[] possibleCds = new Long[maxPartitionSize];
+        for (int cdIdx = 0; cdIdx < possibleCds.length; cdIdx++)
+        {
+            long cd = descriptorSelector.cd(pd, 0, cdIdx, schema);
+            possibleCds[cdIdx] = cd;
+        }
+        Arrays.sort(possibleCds, Long::compare);
+
+        long[] primitiveArray = new long[maxPartitionSize];
+        for (int i = 0; i < possibleCds.length; i++)
+            primitiveArray[i] = possibleCds[i];
+
+        // TODO: can we have something more efficient than a tree set here?
+        return new PartitionVisitStateImpl(pd, primitiveArray, new 
TreeSet<>(), schema);
+    }
+
+    public PresetPdSelector pdSelector()
+    {
+        return presetSelector;
     }
 
     /**
@@ -319,19 +388,17 @@ public class HistoryBuilder implements 
Iterable<ReplayingVisitor.Visit>, SingleO
         // TODO: implement a primitive long map?
         private final Map<Long, Long> ltsToPd = new HashMap<>();
 
-        public PartitionVisitState register(long lts, long pd)
+        public PartitionVisitStateImpl register(long lts, long pd, 
Consumer<PartitionVisitState> setup)
         {
             Long prev = ltsToPd.put(lts, pd);
             if (prev != null)
                 throw new IllegalStateException(String.format("LTS %d. Was 
registered twice, first with %d, and then with %d", lts,  prev, pd));
 
-            long[] possibleCds = new long[maxPartitionSize];
-            for (int i = 0; i < possibleCds.length; i++)
-                possibleCds[i] = descriptorSelector.cd(pd, 0, i, schema);
-            Arrays.sort(possibleCds);
-
-            // TODO: can we have something more efficient than a tree set here?
-            PartitionVisitState partitionState = 
partitionStates.computeIfAbsent(pd, (pd_) -> new PartitionVisitState(pd, 
possibleCds, new TreeSet<>()));
+            PartitionVisitStateImpl partitionState = 
partitionStates.computeIfAbsent(pd, (pd_) -> {
+                PartitionVisitStateImpl partitionVisitState = 
makePartitionVisitState(pd);
+                setup.accept(partitionVisitState);
+                return partitionVisitState;
+            });
             partitionState.visitedLts.add(lts);
             return partitionState;
         }
@@ -344,7 +411,7 @@ public class HistoryBuilder implements 
Iterable<ReplayingVisitor.Visit>, SingleO
         public long nextLts(long lts)
         {
             long pd = pd(lts);
-            PartitionVisitState partitionState = partitionStates.get(pd);
+            PartitionVisitStateImpl partitionState = partitionStates.get(pd);
             NavigableSet<Long> visitedLts = 
partitionState.visitedLts.subSet(lts, false, Long.MAX_VALUE, false);
             if (visitedLts.isEmpty())
                 return -1;
@@ -355,7 +422,7 @@ public class HistoryBuilder implements 
Iterable<ReplayingVisitor.Visit>, SingleO
         public long prevLts(long lts)
         {
             long pd = pd(lts);
-            PartitionVisitState partitionState = partitionStates.get(pd);
+            PartitionVisitStateImpl partitionState = partitionStates.get(pd);
             NavigableSet<Long> visitedLts = 
partitionState.visitedLts.descendingSet().subSet(lts, false, 0L, false);
             if (visitedLts.isEmpty())
                 return -1;
@@ -365,7 +432,7 @@ public class HistoryBuilder implements 
Iterable<ReplayingVisitor.Visit>, SingleO
 
         public long maxLtsFor(long pd)
         {
-            PartitionVisitState partitionState = partitionStates.get(pd);
+            PartitionVisitStateImpl partitionState = partitionStates.get(pd);
             if (partitionState == null)
                 return -1;
             return partitionState.visitedLts.last();
@@ -373,7 +440,7 @@ public class HistoryBuilder implements 
Iterable<ReplayingVisitor.Visit>, SingleO
 
         public long minLtsFor(long pd)
         {
-            PartitionVisitState partitionState = partitionStates.get(pd);
+            PartitionVisitStateImpl partitionState = partitionStates.get(pd);
             if (partitionState == null)
                 return -1;
             return partitionState.visitedLts.first();
@@ -384,11 +451,6 @@ public class HistoryBuilder implements 
Iterable<ReplayingVisitor.Visit>, SingleO
             return defaultSelector.pdAtPosition(pdIdx, schema);
         }
 
-        public Collection<Long> pds()
-        {
-            return partitionStates.keySet();
-        }
-
         public long minLtsAt(long position)
         {
             throw new IllegalArgumentException("not implemented");
@@ -424,6 +486,11 @@ public class HistoryBuilder implements 
Iterable<ReplayingVisitor.Visit>, SingleO
         }
     }
 
+    public Model noOpChecker(SystemUnderTest.ConsistencyLevel cl, 
SystemUnderTest sut)
+    {
+        return new NoOpChecker(sut, cl);
+    }
+
     public Model quiescentChecker(DataTracker tracker, SystemUnderTest sut)
     {
         // TODO: CL for quiescent checker
@@ -437,8 +504,8 @@ public class HistoryBuilder implements 
Iterable<ReplayingVisitor.Visit>, SingleO
     {
         return new QuiescentLocalStateChecker(clock, presetSelector, sut, 
tracker, schema,
                                               new Reconciler(presetSelector,
-                                                   schema,
-                                                   this::visitor),
+                                                             schema,
+                                                             this::visitor),
                                               rf);
     }
 
diff --git 
a/test/harry/main/org/apache/cassandra/harry/dsl/OverridingBijection.java 
b/test/harry/main/org/apache/cassandra/harry/dsl/OverridingBijection.java
new file mode 100644
index 0000000000..fe645d2980
--- /dev/null
+++ b/test/harry/main/org/apache/cassandra/harry/dsl/OverridingBijection.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.harry.dsl;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.cassandra.harry.gen.Bijections;
+
+class OverridingBijection<T> implements Bijections.Bijection<T>
+{
+    protected final Bijections.Bijection<T> delegate;
+    protected final Map<Long, T> descriptorToValue;
+    protected final Map<T, Long> valueToDescriptor;
+
+    public OverridingBijection(Bijections.Bijection<T> delegate)
+    {
+        this.delegate = delegate;
+        descriptorToValue = new HashMap<>();
+        valueToDescriptor = new HashMap<>();
+    }
+
+    public void override(long descriptor, T value)
+    {
+        T old = descriptorToValue.get(descriptor);
+        if (old != null)
+            throw new IllegalStateException(String.format("Can't override %d 
twice. Was already overriden to %s", descriptor, old));
+
+        T orig = delegate.inflate(descriptor);
+        if (!orig.equals(value))
+        {
+            descriptorToValue.put(descriptor, value);
+            valueToDescriptor.put(value, descriptor);
+        }
+    }
+
+    @Override
+    public T inflate(long descriptor)
+    {
+        Object v = descriptorToValue.get(descriptor);
+        if (v != null)
+        {
+            return (T) v;
+        }
+        return delegate.inflate(descriptor);
+    }
+
+    @Override
+    public long deflate(T value)
+    {
+        Long descriptor = valueToDescriptor.get(value);
+        if (descriptor != null)
+            return descriptor;
+        return delegate.deflate(value);
+    }
+
+    @Override
+    public int byteSize()
+    {
+        return delegate.byteSize();
+    }
+
+    @Override
+    public int compare(long l, long r)
+    {
+        return delegate.compare(l, r);
+    }
+}
diff --git 
a/test/harry/main/org/apache/cassandra/harry/dsl/OverridingCkGenerator.java 
b/test/harry/main/org/apache/cassandra/harry/dsl/OverridingCkGenerator.java
new file mode 100644
index 0000000000..92aa00f695
--- /dev/null
+++ b/test/harry/main/org/apache/cassandra/harry/dsl/OverridingCkGenerator.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.harry.dsl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.harry.ddl.ColumnSpec;
+import org.apache.cassandra.harry.gen.Bijections;
+import org.apache.cassandra.harry.gen.DataGenerators;
+
+/**
+ * A class that helps to override parts of clustering key. The tricky part 
about CK overrides is that Harry model makes
+ * an assumption about the ordering of clustering keys, which means 
clusterings have to be sorted in the same way their
+ * descriptors are. This, combined with reverse types, makes managing this 
state somewhat tricky at times.
+ *
+ * Additionally, Relation in delete/select query receives individual CK 
descriptors (i.e. after they have been sliced),
+ * while most other queries usually operate on inflated clustering key. All of 
this is required for efficient _stateless_
+ * validation, but makes overrides a bit less intuitive.
+ *
+ * To summarise: overrides for inflating are done for individual clustering 
key columns. Overrides for deflating a clustering
+ * operate on an entire key. Main reason for this is to allow having same 
string in several rows for the same column.
+ */
+
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class OverridingCkGenerator extends DataGenerators.KeyGenerator
+{
+    private final DataGenerators.KeyGenerator delegate;
+    private final KeyPartOverride[] columnValueOverrides;
+    private final List<ColumnSpec<?>> columnSpecOverrides;
+    private final Map<ArrayWrapper, Long> valueToDescriptor;
+
+    // Had to be a static method because you can not call super after you have 
initialised any fields
+    public static OverridingCkGenerator make(DataGenerators.KeyGenerator 
delegate)
+    {
+        KeyPartOverride[] columnValueOverrides = new 
KeyPartOverride[delegate.columns.size()];
+        List<ColumnSpec<?>> columnSpecOverrides = new ArrayList<>();
+        for (int i = 0; i < delegate.columns.size(); i++)
+        {
+            columnValueOverrides[i] = new 
KeyPartOverride<>(delegate.columns.get(i).generator());
+            
columnSpecOverrides.add(delegate.columns.get(i).override(columnValueOverrides[i]));
+        }
+        assert columnValueOverrides.length == columnSpecOverrides.size();
+        return new OverridingCkGenerator(delegate, columnValueOverrides, 
columnSpecOverrides);
+    }
+
+    private OverridingCkGenerator(DataGenerators.KeyGenerator delegate,
+                                  KeyPartOverride[] columnValueOverrides,
+                                  List<ColumnSpec<?>> columnSpecOverrides)
+    {
+        super(columnSpecOverrides);
+        this.columnValueOverrides = columnValueOverrides;
+        this.columnSpecOverrides = columnSpecOverrides;
+        this.delegate = delegate;
+        this.valueToDescriptor = new HashMap<>();
+    }
+
+    public void override(long descriptor, Object[] value)
+    {
+        long[] slices = delegate.slice(descriptor);
+        for (int i = 0; i < slices.length; i++)
+            columnValueOverrides[i].override(slices[i], value[i]);
+
+        // We _always_ deflate clustering key as a package, since we can not 
validate a clustering key without all components anyways.
+        valueToDescriptor.put(new ArrayWrapper(value), descriptor);
+    }
+
+    @Override
+    public Object[] inflate(long descriptor)
+    {
+        return DataGenerators.inflateKey(columnSpecOverrides, descriptor, 
slice(descriptor));
+    }
+
+    @Override
+    public long deflate(Object[] value)
+    {
+        Long descriptor = valueToDescriptor.get(new ArrayWrapper(value));
+        if (descriptor != null)
+            return descriptor;
+
+        return delegate.deflate(value);
+    }
+
+    @Override
+    public int byteSize()
+    {
+        return delegate.byteSize();
+    }
+
+    @Override
+    public int compare(long l, long r)
+    {
+        return delegate.byteSize();
+    }
+
+    @Override
+    public long[] slice(long descriptor)
+    {
+        return delegate.slice(descriptor);
+    }
+
+    @Override
+    public long stitch(long[] parts)
+    {
+        return delegate.stitch(parts);
+    }
+
+    @Override
+    public long minValue(int idx)
+    {
+        return delegate.minValue(idx);
+    }
+
+    @Override
+    public long maxValue(int idx)
+    {
+        return delegate.maxValue(idx);
+    }
+
+    public static class KeyPartOverride<T> extends OverridingBijection<T>
+    {
+        public KeyPartOverride(Bijections.Bijection<T> delegate)
+        {
+            super(delegate);
+        }
+
+        // We do not use deflate for key part overrides
+        @Override
+        public long deflate(T value)
+        {
+            throw new IllegalStateException();
+        }
+    }
+
+}
diff --git 
a/test/harry/main/org/apache/cassandra/harry/dsl/PartitionVisitState.java 
b/test/harry/main/org/apache/cassandra/harry/dsl/PartitionVisitState.java
index 6944c42cc4..37558274b1 100644
--- a/test/harry/main/org/apache/cassandra/harry/dsl/PartitionVisitState.java
+++ b/test/harry/main/org/apache/cassandra/harry/dsl/PartitionVisitState.java
@@ -1,35 +1,40 @@
 /*
-  * Licensed to the Apache Software Foundation (ASF) under one
-  * or more contributor license agreements.  See the NOTICE file
-  * distributed with this work for additional information
-  * regarding copyright ownership.  The ASF licenses this file
-  * to you under the Apache License, Version 2.0 (the
-  * "License"); you may not use this file except in compliance
-  * with the License.  You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package org.apache.cassandra.harry.dsl;
 
-import java.util.NavigableSet;
-
-public class PartitionVisitState
+public interface PartitionVisitState
 {
-    final long pd;
-    final long[] possibleCds;
-    final NavigableSet<Long> visitedLts;
-
-    PartitionVisitState(long pd, long[] possibleCds, NavigableSet<Long> 
visitedLts)
-    {
-        this.pd = pd;
-        this.possibleCds = possibleCds;
-        this.visitedLts = visitedLts;
-    }
+    /**
+     * Informs a generator that a specific clustering key has to be generated.
+     *
+     * Since Harry model has a few constraints, we can not override to an 
arbitrary clustering to an arbitrary value, since
+     * Harry ensures that clustering descriptors are sorted the same way 
clusterings themselves are sorted.
+     *
+     * Gladly, most of the time we need to override just one or a couple of 
values to trigger some edge condition,
+     * which means that we can simply instruct Harry to produce a given 
handcrafted value.
+     *
+     * When using `ensureClustering`, you can not reliably know in advance 
where specifically in the row this value is
+     * going to sort.
+     *
+     * If you need arbitrary overrides, you will have to produce _all_ 
clusterings possible for the given partition.
+     */
+    void ensureClustering(Object[] overrides);
+    void overrideClusterings(Object[][] overrides);
+    long pd();
 }
diff --git 
a/test/harry/main/org/apache/cassandra/harry/dsl/PartitionVisitStateImpl.java 
b/test/harry/main/org/apache/cassandra/harry/dsl/PartitionVisitStateImpl.java
new file mode 100644
index 0000000000..d61210515b
--- /dev/null
+++ 
b/test/harry/main/org/apache/cassandra/harry/dsl/PartitionVisitStateImpl.java
@@ -0,0 +1,115 @@
+/*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.cassandra.harry.dsl;
+
+import java.util.Arrays;
+import java.util.NavigableSet;
+
+import org.apache.cassandra.harry.ddl.SchemaSpec;
+
+public class PartitionVisitStateImpl implements PartitionVisitState
+{
+    final long pd;
+    final long[] possibleCds;
+    final NavigableSet<Long> visitedLts;
+    final SchemaSpec schema;
+    private final OverridingCkGenerator ckGenerator;
+
+    PartitionVisitStateImpl(long pd, long[] possibleCds, NavigableSet<Long> 
visitedLts, SchemaSpec schema)
+    {
+        this.pd = pd;
+        this.possibleCds = possibleCds;
+        this.visitedLts = visitedLts;
+        this.schema = schema;
+        this.ckGenerator = (OverridingCkGenerator) schema.ckGenerator;
+    }
+
+
+    /**
+     * Ensures that exactly one of the clustering keys will have given values.
+     */
+    @Override
+    public void ensureClustering(Object[] overrides)
+    {
+        long cd = findCdForOverride(overrides);
+        ckGenerator.override(cd, overrides);
+    }
+
+    @Override
+    public void overrideClusterings(Object[][] overrides)
+    {
+        assert possibleCds.length == overrides.length;
+        Arrays.sort(overrides, this::compareCds);
+        for (int i = 0; i < overrides.length; i++)
+            ckGenerator.override(possibleCds[i], overrides[i]);
+    }
+
+    @Override
+    public long pd()
+    {
+        return pd;
+    }
+
+    long findCdForOverride(Object[] ck)
+    {
+        int low = 0;
+        int high = possibleCds.length - 1;
+
+        while (low <= high)
+        {
+            int mid = (low + high) >>> 1;
+            long midEl = possibleCds[mid];
+            int cmp = compareCds(ck, midEl);
+
+            if (cmp < 0)
+                low = mid + 1;
+            else if (cmp > 0)
+                high = mid - 1;
+            else
+                throw new IllegalStateException("This value is already 
present");
+        }
+
+        return possibleCds[Math.min(possibleCds.length - 1, low)];
+    }
+
+    private int compareCds(Object[] v1, long cd2)
+    {
+        Object[] v2 = schema.ckGenerator.inflate(cd2);
+        return compareCds(v1, v2);
+    }
+
+    private int compareCds(Object[] v1, Object[] v2)
+    {
+        assert v1.length == v2.length : String.format("Values should be of 
same length: %d != %d\n%s\n%s",
+                                                      v1.length, v2.length, 
Arrays.toString(v1), Arrays.toString(v2));
+
+        for (int i = 0; i < v1.length; i++)
+        {
+            int res = ((Comparable) v2[i]).compareTo(v1[i]);
+            if (res != 0)
+            {
+                if (schema.clusteringKeys.get(i).type.isReversed())
+                    res = res * -1;
+
+                return res;
+            }
+        }
+        return 0;
+    }
+}
\ No newline at end of file
diff --git 
a/test/harry/main/org/apache/cassandra/harry/dsl/ReplayingHistoryBuilder.java 
b/test/harry/main/org/apache/cassandra/harry/dsl/ReplayingHistoryBuilder.java
index 312da15949..60d8e4ea53 100644
--- 
a/test/harry/main/org/apache/cassandra/harry/dsl/ReplayingHistoryBuilder.java
+++ 
b/test/harry/main/org/apache/cassandra/harry/dsl/ReplayingHistoryBuilder.java
@@ -18,6 +18,8 @@
 
 package org.apache.cassandra.harry.dsl;
 
+import java.util.function.Consumer;
+
 import org.apache.cassandra.harry.ddl.SchemaSpec;
 import org.apache.cassandra.harry.model.Model;
 import org.apache.cassandra.harry.sut.SystemUnderTest;
@@ -46,10 +48,11 @@ public class ReplayingHistoryBuilder extends HistoryBuilder
         this.sut = sut;
     }
 
-    protected SingleOperationVisitBuilder singleOpVisitBuilder(long pd, long 
lts)
+    @Override
+    protected SingleOperationVisitBuilder singleOpVisitBuilder(long pd, long 
lts, Consumer<PartitionVisitState> setupPs)
     {
-        PartitionVisitState partitionState = presetSelector.register(lts, pd);
-        return new SingleOperationVisitBuilder(partitionState, lts, pureRng, 
descriptorSelector, schema, (visit) -> {
+        PartitionVisitStateImpl partitionState = presetSelector.register(lts, 
pd, setupPs);
+        return new SingleOperationVisitBuilder(partitionState, lts, pureRng, 
descriptorSelector, schema, valueHelper, (visit) -> {
             log.put(lts, visit);
         }) {
             @Override
@@ -84,8 +87,8 @@ public class ReplayingHistoryBuilder extends HistoryBuilder
     @Override
     protected BatchVisitBuilder batchVisitBuilder(long pd, long lts)
     {
-        PartitionVisitState partitionState = presetSelector.register(lts, pd);
-        return new BatchVisitBuilder(this, partitionState, lts, pureRng, 
descriptorSelector, schema, (visit) -> {
+        PartitionVisitStateImpl partitionState = presetSelector.register(lts, 
pd, (ps) -> {});
+        return new BatchVisitBuilder(this, partitionState, lts, pureRng, 
descriptorSelector, schema, valueHelper, (visit) -> {
             log.put(lts, visit);
         }) {
             @Override
diff --git 
a/test/harry/main/org/apache/cassandra/harry/dsl/SingleOperationBuilder.java 
b/test/harry/main/org/apache/cassandra/harry/dsl/SingleOperationBuilder.java
index 10f2026a7d..a9b98b69ca 100644
--- a/test/harry/main/org/apache/cassandra/harry/dsl/SingleOperationBuilder.java
+++ b/test/harry/main/org/apache/cassandra/harry/dsl/SingleOperationBuilder.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.harry.dsl;
 
 public interface SingleOperationBuilder
 {
-
     /**
      * Perform an insert operation to _some_ row
      */
@@ -36,8 +35,8 @@ public interface SingleOperationBuilder
      * Insert _specific values_ into _specific_ row. Rows are ordered by 
clustering key and
      * numbered from 0 to maxRows
      */
-    SingleOperationBuilder insert(int rowIdx, long[] vds);
-    SingleOperationBuilder insert(int rowIdx, long[] vds, long[] sds);
+    SingleOperationBuilder insert(int rowIdx, long[] valueIdxs);
+    SingleOperationBuilder insert(int rowIdx, long[] valueIdxs, long[] 
sValueIdxs);
 
     SingleOperationBuilder deletePartition();
 
diff --git 
a/test/harry/main/org/apache/cassandra/harry/dsl/SingleOperationVisitBuilder.java
 
b/test/harry/main/org/apache/cassandra/harry/dsl/SingleOperationVisitBuilder.java
index 05652f13d0..ce25566a24 100644
--- 
a/test/harry/main/org/apache/cassandra/harry/dsl/SingleOperationVisitBuilder.java
+++ 
b/test/harry/main/org/apache/cassandra/harry/dsl/SingleOperationVisitBuilder.java
@@ -36,38 +36,44 @@ class SingleOperationVisitBuilder implements 
SingleOperationBuilder
 {
     // TODO: singleton collection for this op class
     private final List<VisitExecutor.BaseOperation> operations;
-    private final PartitionVisitState partitionState;
+    private final PartitionVisitStateImpl partitionState;
 
     private final long lts;
     private final long pd;
 
     private final OpSelectors.PureRng rng;
+
     private final OpSelectors.DescriptorSelector descriptorSelector;
-    private final SchemaSpec schemaSpec;
+    private final ValueHelper valueHelper;
+    private final SchemaSpec schema;
 
     private final Consumer<ReplayingVisitor.Visit> appendToLog;
     private final WithEntropySource rngSupplier = new WithEntropySource();
 
     private int opIdCounter;
 
-    public SingleOperationVisitBuilder(PartitionVisitState partitionState,
+    public SingleOperationVisitBuilder(PartitionVisitStateImpl partitionState,
                                        long lts,
                                        OpSelectors.PureRng rng,
                                        OpSelectors.DescriptorSelector 
descriptorSelector,
-                                       SchemaSpec schemaSpec,
+                                       SchemaSpec schema,
+                                       ValueHelper valueHelper,
                                        Consumer<ReplayingVisitor.Visit> 
appendToLog)
     {
-        this.lts = lts;
+        this.operations = new ArrayList<>();
         this.partitionState = partitionState;
-        this.pd = partitionState.pd;
 
-        this.appendToLog = appendToLog;
-        this.operations = new ArrayList<>();
-        this.opIdCounter = 0;
+        this.pd = partitionState.pd;
+        this.lts = lts;
 
         this.rng = rng;
+
         this.descriptorSelector = descriptorSelector;
-        this.schemaSpec = schemaSpec;
+        this.valueHelper = valueHelper;
+        this.schema = schema;
+
+        this.appendToLog = appendToLog;
+        this.opIdCounter = 0;
     }
 
     @Override
@@ -87,7 +93,7 @@ class SingleOperationVisitBuilder implements 
SingleOperationBuilder
         {
             public long[] vds()
             {
-                return descriptorSelector.vds(pd, cd, lts, opId, kind(), 
schemaSpec);
+                return descriptorSelector.vds(pd, cd, lts, opId, kind(), 
schema);
             }
         });
         end();
@@ -95,8 +101,9 @@ class SingleOperationVisitBuilder implements 
SingleOperationBuilder
     }
 
     @Override
-    public SingleOperationVisitBuilder insert(int rowIdx, long[] vds)
+    public SingleOperationVisitBuilder insert(int rowIdx, long[] valueIdxs)
     {
+        assert valueIdxs.length == valueHelper.regularColumns.size();
         int opId = opIdCounter++;
         long cd = partitionState.possibleCds[rowIdx];
         operations.add(new GeneratingVisitor.GeneratedWriteOp(lts, pd, cd, 
opId,
@@ -104,6 +111,13 @@ class SingleOperationVisitBuilder implements 
SingleOperationBuilder
         {
             public long[] vds()
             {
+                long[] vds = new long[valueIdxs.length];
+                for (int i = 0; i < valueHelper.regularColumns.size(); i++)
+                {
+                    vds[i] = valueHelper.descriptorGenerators
+                             .get(valueHelper.regularColumns.get(i).name)
+                             .inflate(valueIdxs[i]);
+                }
                 return vds;
             }
         });
@@ -112,23 +126,39 @@ class SingleOperationVisitBuilder implements 
SingleOperationBuilder
     }
 
     @Override
-    public SingleOperationBuilder insert(int rowIdx, long[] vds, long[] sds)
+    public SingleOperationBuilder insert(int rowIdx, long[] valueIdxs, long[] 
sValueIdxs)
     {
+        assert valueIdxs.length == valueHelper.regularColumns.size();
+        assert sValueIdxs.length == valueHelper.staticColumns.size();
         int opId = opIdCounter++;
         long cd = partitionState.possibleCds[rowIdx];
         operations.add(new GeneratingVisitor.GeneratedWriteWithStaticOp(lts, 
pd, cd, opId,
                                                                         
OpSelectors.OperationKind.INSERT_WITH_STATICS)
         {
             @Override
-            public long[] sds()
+            public long[] vds()
             {
-                return sds;
+                long[] vds = new long[valueIdxs.length];
+                for (int i = 0; i < valueHelper.regularColumns.size(); i++)
+                {
+                    vds[i] = valueHelper.descriptorGenerators
+                             .get(valueHelper.regularColumns.get(i).name)
+                             .inflate(valueIdxs[i]);
+                }
+                return vds;
             }
 
             @Override
-            public long[] vds()
+            public long[] sds()
             {
-                return vds;
+                long[] sds = new long[sValueIdxs.length];
+                for (int i = 0; i < valueHelper.staticColumns.size(); i++)
+                {
+                    sds[i] = valueHelper.descriptorGenerators
+                             .get(valueHelper.staticColumns.get(i).name)
+                             .inflate(sValueIdxs[i]);
+                }
+                return sds;
             }
         });
         end();
@@ -140,7 +170,7 @@ class SingleOperationVisitBuilder implements 
SingleOperationBuilder
     {
         int opId = opIdCounter++;
         operations.add(new GeneratingVisitor.GeneratedDeleteOp(lts, pd, opId, 
OpSelectors.OperationKind.DELETE_PARTITION,
-                                                               
Query.selectPartition(schemaSpec, pd, false)));
+                                                               
Query.selectPartition(schema, pd, false)));
         end();
         return this;
     }
@@ -205,7 +235,7 @@ class SingleOperationVisitBuilder implements 
SingleOperationBuilder
 
                     boolean isMinEq = rng.nextBoolean();
                     boolean isMaxEq = rng.nextBoolean();
-                    query = Query.clusteringRangeQuery(schemaSpec, pd, cd1, 
cd2, queryDescriptor, isMinEq, isMaxEq, false);
+                    query = Query.clusteringRangeQuery(schema, pd, cd1, cd2, 
queryDescriptor, isMinEq, isMaxEq, false);
                     break;
                 }
                 catch (IllegalArgumentException retry)
@@ -227,7 +257,7 @@ class SingleOperationVisitBuilder implements 
SingleOperationBuilder
 
         long cd1 = partitionState.possibleCds[lowBoundRowIdx];
         long cd2 = partitionState.possibleCds[highBoundRowIdx];
-        Query query = Query.clusteringRangeQuery(schemaSpec, pd, cd1, cd2, 
queryDescriptor, isMinEq, isMaxEq, false);
+        Query query = Query.clusteringRangeQuery(schema, pd, cd1, cd2, 
queryDescriptor, isMinEq, isMaxEq, false);
         operations.add(new GeneratingVisitor.GeneratedDeleteOp(lts, pd, opId, 
OpSelectors.OperationKind.DELETE_SLICE, query));
         end();
         return this;
@@ -249,7 +279,7 @@ class SingleOperationVisitBuilder implements 
SingleOperationBuilder
 
                     boolean isGt = rng.nextBoolean();
                     boolean isEquals = rng.nextBoolean();
-                    query = Query.clusteringSliceQuery(schemaSpec, pd, cd, 
queryDescriptor, isGt, isEquals, false);
+                    query = Query.clusteringSliceQuery(schema, pd, cd, 
queryDescriptor, isGt, isEquals, false);
                     break;
                 }
                 catch (IllegalArgumentException retry)
diff --git 
a/test/harry/main/org/apache/cassandra/harry/dsl/ValueDescriptorIndexGenerator.java
 
b/test/harry/main/org/apache/cassandra/harry/dsl/ValueDescriptorIndexGenerator.java
index dc74412c3a..fd100f869c 100644
--- 
a/test/harry/main/org/apache/cassandra/harry/dsl/ValueDescriptorIndexGenerator.java
+++ 
b/test/harry/main/org/apache/cassandra/harry/dsl/ValueDescriptorIndexGenerator.java
@@ -37,12 +37,18 @@ import org.apache.cassandra.harry.model.OpSelectors;
  */
 public class ValueDescriptorIndexGenerator implements 
Surjections.Surjection<Long>
 {
+    public static int UNSET = Integer.MIN_VALUE;
+
     private final OpSelectors.PureRng rng;
     private final long columnHash;
     private final long mask;
 
-    public ValueDescriptorIndexGenerator(ColumnSpec<?> columnSpec,
-                                         OpSelectors.PureRng rng)
+    public ValueDescriptorIndexGenerator(ColumnSpec<?> columnSpec, long seed)
+    {
+        this(columnSpec, new OpSelectors.PCGFast(seed));
+    }
+
+    public ValueDescriptorIndexGenerator(ColumnSpec<?> columnSpec, 
OpSelectors.PureRng rng)
     {
         this.rng = rng;
         this.columnHash = columnSpec.hashCode();
@@ -52,6 +58,9 @@ public class ValueDescriptorIndexGenerator implements 
Surjections.Surjection<Lon
     @Override
     public Long inflate(long idx)
     {
+        if (idx == UNSET)
+            return DataGenerators.UNSET_DESCR;
+
         return rng.randomNumber(idx, columnHash) & mask;
     }
 
diff --git a/test/harry/main/org/apache/cassandra/harry/dsl/ValueHelper.java 
b/test/harry/main/org/apache/cassandra/harry/dsl/ValueHelper.java
new file mode 100644
index 0000000000..fcce222543
--- /dev/null
+++ b/test/harry/main/org/apache/cassandra/harry/dsl/ValueHelper.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.harry.dsl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.harry.ddl.ColumnSpec;
+import org.apache.cassandra.harry.ddl.SchemaSpec;
+import org.apache.cassandra.harry.model.OpSelectors;
+
+class ValueHelper implements ValueOverrides
+{
+    protected Map<String, OverridingBijection<?>> overrides = new HashMap<>();
+    protected Map<String, ValueDescriptorIndexGenerator> descriptorGenerators 
= new HashMap<>();
+
+    protected final List<ColumnSpec<?>> regularColumns;
+    protected final List<ColumnSpec<?>> staticColumns;
+
+    @SuppressWarnings("rawtypes,unchecked")
+    public ValueHelper(SchemaSpec orig, OpSelectors.PureRng rng)
+    {
+        this.regularColumns = new ArrayList<>();
+        for (ColumnSpec<?> regular : orig.regularColumns)
+        {
+            OverridingBijection override = new 
OverridingBijection<>(regular.generator());
+            regular = regular.override(override);
+            this.regularColumns.add(regular);
+            this.overrides.put(regular.name, override);
+            this.descriptorGenerators.put(regular.name, new 
ValueDescriptorIndexGenerator(regular, rng));
+        }
+
+        this.staticColumns = new ArrayList<>();
+        for (ColumnSpec<?> static_ : orig.staticColumns)
+        {
+            OverridingBijection override = new 
OverridingBijection<>(static_.generator());
+            static_ = static_.override(override);
+            this.staticColumns.add(static_);
+            this.overrides.put(static_.name, override);
+            this.descriptorGenerators.put(static_.name, new 
ValueDescriptorIndexGenerator(static_, rng));
+        }
+    }
+
+    @Override
+    @SuppressWarnings("unchecked,rawtypes")
+    public void override(String columnName, int idx, Object override)
+    {
+        OverridingBijection gen = overrides.get(columnName);
+        if (gen == null)
+            throw new IllegalStateException(String.format("Overrides for %s 
are not supported", columnName));
+        if (idx == ValueDescriptorIndexGenerator.UNSET)
+            throw new IllegalStateException("Can't override an UNSET value");
+
+        gen.override(descriptorGenerators.get(columnName).inflate(idx), 
override);
+    }
+}
diff --git a/test/harry/main/org/apache/cassandra/harry/dsl/ValueOverrides.java 
b/test/harry/main/org/apache/cassandra/harry/dsl/ValueOverrides.java
new file mode 100644
index 0000000000..df1e61e0f3
--- /dev/null
+++ b/test/harry/main/org/apache/cassandra/harry/dsl/ValueOverrides.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.harry.dsl;
+
+public interface ValueOverrides
+{
+    void override(String column, int idx, Object override);
+}
diff --git a/test/harry/main/org/apache/cassandra/harry/gen/DataGenerators.java 
b/test/harry/main/org/apache/cassandra/harry/gen/DataGenerators.java
index 70ae26c63b..39c0c80baf 100644
--- a/test/harry/main/org/apache/cassandra/harry/gen/DataGenerators.java
+++ b/test/harry/main/org/apache/cassandra/harry/gen/DataGenerators.java
@@ -39,6 +39,8 @@ public class DataGenerators
     // during value generation
     public static long UNSET_DESCR = Long.MAX_VALUE;
     public static long NIL_DESCR = Long.MIN_VALUE;
+    // Empty value, for the types that support it
+    public static long EMPTY_VALUE = Long.MIN_VALUE + 1;
 
     public static Object[] inflateData(List<ColumnSpec<?>> columns, long[] 
descriptors)
     {
@@ -95,7 +97,7 @@ public class DataGenerators
                         this.maxSize = maxSize;
                     }
                 }
-                int[] bytes = new int[Math.min(4, columns.size())];
+                int[] bytes = new 
int[Math.min(KeyGenerator.MAX_UNIQUE_PREFIX_COLUMNS, columns.size())];
                 Pair[] sorted = new Pair[bytes.length];
                 for (int i = 0; i < sorted.length; i++)
                     sorted[i] = new Pair(i, columns.get(i).type.maxSize());
@@ -163,7 +165,7 @@ public class DataGenerators
         assert columns.size() == values.length : String.format("%s != %s", 
columns.size(), values.length);
         assert columns.size() > 0 : "Can't deflate from empty columnset";
 
-        int fixedPart = Math.min(4, columns.size());
+        int fixedPart = Math.min(KeyGenerator.MAX_UNIQUE_PREFIX_COLUMNS, 
columns.size());
 
         long[] slices = new long[fixedPart];
         boolean allNulls = true;
@@ -251,10 +253,13 @@ public class DataGenerators
 
     public static abstract class KeyGenerator implements 
Bijections.Bijection<Object[]>
     {
+        // Maximum number of columns that uniquely identify the value (i.e. 
use entropy bits).
+        // Subsequent columns will have random data in them.
+        public static final int MAX_UNIQUE_PREFIX_COLUMNS = 4;
         @VisibleForTesting
         public final List<ColumnSpec<?>> columns;
 
-        KeyGenerator(List<ColumnSpec<?>> columns)
+        protected KeyGenerator(List<ColumnSpec<?>> columns)
         {
             this.columns = columns;
         }
@@ -395,7 +400,6 @@ public class DataGenerators
             int maxSliceSize = gen.byteSize();
             int actualSliceSize = sizes[idx];
 
-
             if (idx == 0)
             {
                 // We consume a sign of a descriptor (long, long), (int, int), 
etc.
@@ -430,7 +434,7 @@ public class DataGenerators
 
         public long[] slice(long descriptor)
         {
-            long[] pieces = new long[sizes.length];
+            long[] pieces = new long[columns.size()];
             long pos = totalSize;
             for (int i = 0; i < sizes.length; i++)
             {
@@ -445,6 +449,15 @@ public class DataGenerators
                 pieces[i] = piece;
                 pos -= size;
             }
+
+            // The rest can be random, since prefix is always fixed
+            long current = descriptor;
+            for (int i = sizes.length; i < columns.size(); i++)
+            {
+                current = RngUtils.next(current);
+                pieces[i] = 
columns.get(i).generator().adjustEntropyDomain(current);
+            }
+
             return pieces;
         }
 
@@ -467,7 +480,6 @@ public class DataGenerators
             return stitched;
         }
 
-
         public long minValue(int idx)
         {
             long res = columns.get(idx).generator().minValue();
diff --git a/test/harry/main/org/apache/cassandra/harry/model/NoOpChecker.java 
b/test/harry/main/org/apache/cassandra/harry/model/NoOpChecker.java
index 10e7fab994..7683f0ef95 100644
--- a/test/harry/main/org/apache/cassandra/harry/model/NoOpChecker.java
+++ b/test/harry/main/org/apache/cassandra/harry/model/NoOpChecker.java
@@ -24,17 +24,21 @@ import org.apache.cassandra.harry.operations.Query;
 
 public class NoOpChecker implements Model
 {
-    private final Run run;
-
+    private final SystemUnderTest sut;
+    private final SystemUnderTest.ConsistencyLevel cl;
     public NoOpChecker(Run run)
     {
-        this.run = run;
+        this(run.sut, SystemUnderTest.ConsistencyLevel.QUORUM);
+    }
+
+    public NoOpChecker(SystemUnderTest sut, SystemUnderTest.ConsistencyLevel 
cl)
+    {
+        this.sut = sut;
+        this.cl = cl;
     }
 
     public void validate(Query query)
     {
-        run.sut.execute(query.toSelectStatement(),
-                        // TODO: make it configurable
-                        SystemUnderTest.ConsistencyLevel.QUORUM);
+        sut.execute(query.toSelectStatement(), cl);
     }
 }
diff --git 
a/test/harry/main/org/apache/cassandra/harry/operations/Relation.java 
b/test/harry/main/org/apache/cassandra/harry/operations/Relation.java
index c953a0da4d..e957125192 100644
--- a/test/harry/main/org/apache/cassandra/harry/operations/Relation.java
+++ b/test/harry/main/org/apache/cassandra/harry/operations/Relation.java
@@ -97,7 +97,7 @@ public class Relation
 
     public static void addRelation(long[] key, List<ColumnSpec<?>> 
columnSpecs, List<Relation> relations, RelationKind kind)
     {
-        assert key.length == columnSpecs.size() :
+        assert key.length == columnSpecs.size() || key.length > 
DataGenerators.KeyGenerator.MAX_UNIQUE_PREFIX_COLUMNS :
         String.format("Key size (%d) should equal to column spec size (%d). 
Specs: %s", key.length, columnSpecs.size(), columnSpecs);
         for (int i = 0; i < key.length; i++)
         {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to