This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit df24dacc0c5fc679fe018d18691db40b5f996fc3
Merge: 7160c49ed7 07831c9cc7
Author: David Capwell <dcapw...@apache.org>
AuthorDate: Wed Apr 23 17:58:50 2025 -0700

    Merge branch 'cassandra-5.0' into trunk

 CHANGES.txt                                        |  1 +
 .../db/compaction/LeveledCompactionStrategy.java   |  3 +-
 .../index/sai/disk/v1/MemtableIndexWriter.java     | 15 ++--
 .../cassandra/distributed/shared/ClusterUtils.java | 40 +++++++++
 .../test/sai/PartialWritesWithRepairTest.java      | 96 ++++++++++++++++++++++
 .../fuzz/sai/AccordFullMultiNodeSAITest.java       |  2 +-
 .../fuzz/sai/AccordInteropMultiNodeSAITest.java    |  2 +-
 .../cassandra/utils/CassandraGenerators.java       | 34 +++++++-
 8 files changed, 182 insertions(+), 11 deletions(-)

diff --cc CHANGES.txt
index d37705c1e8,21b9f481b2..45ea128630
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,186 -1,9 +1,187 @@@
 -5.0.5
 - * SAI marks an index as non-empty when a partial partition/row modifications 
is flushed due to repair (CASSANDRA-20567)
 - * SAI fails queries when multiple columns exist and a non-indexed column is 
a composite with a map (CASSANDRA-19891)
 +5.1
 + * Change SSTableSimpleScanner to use SSTableReader#openDataReaderForScan 
(CASSANDRA-20538)
 + * Automated Repair Inside Cassandra [CEP-37] (CASSANDRA-19918)
   * Grant permission on keyspaces system_views and system_virtual_schema not 
possible (CASSANDRA-20171)
 + * General Purpose Transactions (Accord) [CEP-15] (CASSANDRA-17092)
 + * Improve performance when getting writePlacementsAllSettled from 
ClusterMetadata (CASSANDRA-20526)
 + * Add nodetool command to dump the contents of the 
system_views.{cluster_metadata_log, cluster_metadata_directory} tables 
(CASSANDRA-20525)
 + * Fix TreeMap race in CollectionVirtualTableAdapter causing us to lose rows 
in the virtual table (CASSANDRA-20524)
 + * Improve metadata log catch up with inter-DC mutation forwarding 
(CASSANDRA-20523)
 + * Support topology-safe changes to Datacenter & Rack for live nodes 
(CASSANDRA-20528)
 + * Add SSTableIntervalTree latency metric (CASSANDRA-20502)
 + * Ignore repetitions of semicolon in CQLSH (CASSANDRA-19956)
 + * Avoid NPE during cms initialization abort (CASSANDRA-20527)
 + * Avoid failing queries when epoch changes and replica goes up/down 
(CASSANDRA-20489)
 + * Split out truncation record lock (CASSANDRA-20480)
 + * Throw new IndexBuildInProgressException when queries fail during index 
build, instead of IndexNotAvailableException (CASSANDRA-20402)
 + * Fix Paxos repair interrupts running transactions (CASSANDRA-20469)
 + * Various fixes in constraint framework (CASSANDRA-20481)
 + * Add support in CAS for -= on numeric types, and fixed improper handling of 
empty bytes which lead to NPE (CASSANDRA-20477)
 + * Do not fail to start a node with materialized views after they are turned 
off in config (CASSANDRA-20452)
 + * Fix nodetool gcstats output, support human-readable units and more output 
formats (CASSANDRA-19022)
 + * Various gossip to TCM upgrade fixes (CASSANDRA-20483)
 + * Add nodetool command to abort failed nodetool cms initialize 
(CASSANDRA-20482)
 + * Repair Paxos for the distributed metadata log when CMS membership changes 
(CASSANDRA-20467)
 + * Reintroduce CASSANDRA-17411 in trunk (CASSANDRA-19346)
 + * Add min/max/mean/percentiles to timer metrics vtable (CASSANDRA-20466)
 + * Add support for time, date, timestamp types in scalar constraint 
(CASSANDRA-20274)
 + * Add regular expression constraint (CASSANDRA-20275)
 + * Improve constraints autocompletion (CASSANDRA-20341)
 + * Add JVM version and Cassandra build date to nodetool version -v 
(CASSANDRA-19721)
 + * Move all disk error logic to DiskErrorsHandler to enable pluggability 
(CASSANDRA-20363)
   * Fix marking an SSTable as suspected and BufferPool leakage in case of a 
corrupted SSTable read during a compaction (CASSANDRA-20396)
 + * Add missed documentation for CREATE TABLE LIKE (CASSANDRA-20401)
 + * Add OCTET_LENGTH constraint (CASSANDRA-20340)
 + * Reduce memory allocations in miscellaneous places along the hot write path 
(CASSANDRA-20167)
 + * Provide keystore_password_file and truststore_password_file options to 
read credentials from a file (CASSANDRA-13428)
 + * Unregistering a node should also remove it from tokenMap if it is there 
and recalculate the placements (CASSANDRA-20346)
 + * Fix PartitionUpdate.isEmpty deserialization issue to avoid potential 
EOFException (CASSANDRA-20345)
 + * Avoid adding LEFT nodes to tokenMap on upgrade from gossip 
(CASSANDRA-20344)
 + * Allow empty placements when deserializing cluster metadata 
(CASSANDRA-20343)
 + * Reduce heap pressure when initializing CMS (CASSANDRA-20267)
 + * Paxos Repair: NoSuchElementException on 
DistributedSchema.getKeyspaceMetadata (CASSANDRA-20320)
 + * Improve performance of DistributedSchema.validate for large schemas 
(CASSANDRA-20360)
 + * Add JSON constraint (CASSANDRA-20273)
 + * Prevent invalid constraint combinations (CASSANDRA-20330)
 + * Support CREATE TABLE LIKE WITH INDEXES (CASSANDRA-19965)
 + * Invalidate relevant prepared statements on every change to TableMetadata 
(CASSANDRA-20318)
 + * Add per type max size guardrails (CASSANDRA-19677)
 + * Make it possible to abort all kinds of multi step operations 
(CASSANDRA-20217)
 + * Do not leak non-Java exceptions when calling snapshot operations via JMX 
(CASSANDRA-20335)
 + * Implement NOT_NULL constraint (CASSANDRA-20276)
 + * Improve error messages for constraints (CASSANDRA-20266)
 + * Add system_views.partition_key_statistics for querying SSTable metadata 
(CASSANDRA-20161)
 + * CEP-42 - Add Constraints Framework (CASSANDRA-19947)
 + * Add table metric PurgeableTombstoneScannedHistogram and a tracing event 
for scanned purgeable tombstones (CASSANDRA-20132)
 + * Make sure we can parse the expanded CQL before writing it to the log or 
sending it to replicas (CASSANDRA-20218)
 + * Add format_bytes and format_time functions (CASSANDRA-19546)
 + * Fix error when trying to assign a tuple to target type not being a tuple 
(CASSANDRA-20237)
 + * Fail CREATE TABLE LIKE statement if UDTs in target keyspace do not exist 
or they have different structure from ones in source keyspace (CASSANDRA-19966)
 + * Support octet_length and length functions (CASSANDRA-20102)
 + * Make JsonUtils serialize Instant always with the same format 
(CASSANDRA-20209)
 + * Port Harry v2 to trunk (CASSANDRA-20200)
 + * Enable filtering of snapshots on keyspace, table and snapshot name in 
nodetool listsnapshots (CASSANDRA-20151)
 + * Create manifest upon loading where it does not exist or enrich it 
(CASSANDRA-20150)
 + * Propagate true size of snapshot in SnapshotDetailsTabularData to not call 
JMX twice in nodetool listsnapshots (CASSANDRA-20149)
 + * Implementation of CEP-43 - copying a table via CQL by CREATE TABLE LIKE 
(CASSANDRA-19964)
 + * Periodically disconnect roles that are revoked or have LOGIN=FALSE set 
(CASSANDRA-19385)
 + * AST library for CQL-based fuzz tests (CASSANDRA-20198)
 + * Support audit logging for JMX operations (CASSANDRA-20128)
 + * Enable sorting of nodetool status output (CASSANDRA-20104)
 + * Support downgrading after CMS is initialized (CASSANDRA-20145)
 + * Deprecate IEndpointSnitch (CASSANDRA-19488)
 + * Check presence of a snapshot in a case-insensitive manner on macOS 
platform to prevent hardlinking failures (CASSANDRA-20146)
 + * Enable JMX server configuration to be in cassandra.yaml (CASSANDRA-11695)
 + * Parallelized UCS compactions (CASSANDRA-18802)
 + * Avoid prepared statement invalidation race when committing schema changes 
(CASSANDRA-20116)
 + * Restore optimization in MultiCBuilder around building one clustering 
(CASSANDRA-20129)
 + * Consolidate all snapshot management to SnapshotManager and introduce 
SnapshotManagerMBean (CASSANDRA-18111)
 + * Fix RequestFailureReason constants codes (CASSANDRA-20126)
   * Introduce SSTableSimpleScanner for compaction (CASSANDRA-20092)
 + * Include column drop timestamp in alter table transformation 
(CASSANDRA-18961)
 + * Make JMX SSL configurable in cassandra.yaml (CASSANDRA-18508)
 + * Fix cqlsh CAPTURE command to save query results without trace details when 
TRACING is ON (CASSANDRA-19105)
 + * Optionally prevent tombstone purging during repair (CASSANDRA-20071)
 + * Add post-filtering support for the IN operator in SAI queries 
(CASSANDRA-20025)
 + * Don’t finish ongoing decommission and move operations during startup 
(CASSANDRA-20040)
 + * Nodetool reconfigure cms has correct return code when streaming fails 
(CASSANDRA-19972)
 + * Reintroduce RestrictionSet#iterator() optimization around multi-column 
restrictions (CASSANDRA-20034)
 + * Explicitly localize strings to Locale.US for internal implementation 
(CASSANDRA-19953)
 + * Add -H option for human-friendly output in nodetool compactionhistory 
(CASSANDRA-20015)
 + * Fix type check for referenced duration type for nested types 
(CASSANDRA-19890)
 + * In simulation tests, correctly set the tokens of replacement nodes 
(CASSANDRA-19997)
 + * During TCM upgrade, retain all properties of existing system tables 
(CASSANDRA-19992)
 + * Properly cancel in-flight futures and reject requests in 
EpochAwareDebounce during shutdown (CASSANDRA-19848)
 + * Provide clearer exception message on failing commitlog_disk_access_mode 
combinations (CASSANDRA-19812)
 + * Add total space used for a keyspace to nodetool tablestats 
(CASSANDRA-19671)
 + * Ensure Relation#toRestriction() handles ReversedType properly 
(CASSANDRA-19950)
 + * Add JSON and YAML output option to nodetool gcstats (CASSANDRA-19771)
 + * Introduce metadata serialization version V4 (CASSANDRA-19970)
 + * Allow CMS reconfiguration to work around DOWN nodes (CASSANDRA-19943)
 + * Make TableParams.Serializer set allowAutoSnapshots and incrementalBackups 
(CASSANDRA-19954)
 + * Make sstabledump possible to show tombstones only (CASSANDRA-19939)
 + * Ensure that RFP queries potentially stale replicas even with only key 
columns in the row filter (CASSANDRA-19938)
 + * Allow nodes to change IP address while upgrading to TCM (CASSANDRA-19921)
 + * Retain existing keyspace params on system tables after upgrade 
(CASSANDRA-19916)
 + * Deprecate use of gossip state for paxos electorate verification 
(CASSANDRA-19904)
 + * Update dtest-api to 0.0.17 to fix jvm17 crash in jvm-dtests 
(CASSANDRA-19239)
 + * Add resource leak test and Update Netty to 4.1.113.Final to fix leak 
(CASSANDRA-19783)
 + * Fix incorrect nodetool suggestion when gossip mode is running 
(CASSANDRA-19905)
 + * SAI support for BETWEEN operator (CASSANDRA-19688)
 + * Fix BETWEEN filtering for reversed clustering columns (CASSANDRA-19878)
 + * Retry if node leaves CMS while committing a transformation 
(CASSANDRA-19872)
 + * Add support for NOT operators in WHERE clauses. Fixed Three Valued Logic 
(CASSANDRA-18584)
 + * Allow getendpoints for system tables and make sure getNaturalReplicas work 
for MetaStrategy (CASSANDRA-19846)
 + * On upgrade, handle pre-existing tables with unexpected table ids 
(CASSANDRA-19845)
 + * Reconfigure CMS before assassinate (CASSANDRA-19768)
 + * Warn about unqualified prepared statement only if it is select or 
modification statement (CASSANDRA-18322)
 + * Update legacy peers tables during node replacement (CASSANDRA-19782)
 + * Refactor ColumnCondition (CASSANDRA-19620)
 + * Allow configuring log format for Audit Logs (CASSANDRA-19792)
 + * Support for noboolean rpm (centos7 compatible) packages removed 
(CASSANDRA-19787)
 + * Allow threads waiting for the metadata log follower to be interrupted 
(CASSANDRA-19761)
 + * Support dictionary lookup for CassandraPasswordValidator (CASSANDRA-19762)
 + * Disallow denylisting keys in system_cluster_metadata (CASSANDRA-19713)
 + * Fix gossip status after replacement (CASSANDRA-19712)
 + * Ignore repair requests for system_cluster_metadata (CASSANDRA-19711)
 + * Avoid ClassCastException when verifying tables with reversed partitioner 
(CASSANDRA-19710)
 + * Always repair the full range when repairing system_cluster_metadata 
(CASSANDRA-19709)
 + * Use table-specific partitioners during Paxos repair (CASSANDRA-19714)
 + * Expose current compaction throughput in nodetool (CASSANDRA-13890)
 + * CEP-24 Password validation / generation (CASSANDRA-17457)
 + * Reconfigure CMS after replacement, bootstrap and move operations 
(CASSANDRA-19705)
 + * Support querying LocalStrategy tables with any partitioner 
(CASSANDRA-19692)
 + * Relax slow_query_log_timeout for MultiNodeSAITest (CASSANDRA-19693)
 + * Audit Log entries are missing identity for mTLS connections 
(CASSANDRA-19669)
 + * Add support for the BETWEEN operator in WHERE clauses (CASSANDRA-19604)
 + * Replace Stream iteration with for-loop for 
SimpleRestriction::bindAndGetClusteringElements (CASSANDRA-19679)
 + * Consolidate logging on trace level (CASSANDRA-19632)
 + * Expand DDL statements on coordinator before submission to the CMS 
(CASSANDRA-19592)
 + * Fix number of arguments of String.format() in various classes 
(CASSANDRA-19645)
 + * Remove unused fields from config (CASSANDRA-19599)
 + * Refactor Relation and Restriction hierarchies (CASSANDRA-19341)
 + * Raise priority of TCM internode messages during critical operations 
(CASSANDRA-19517)
 + * Add nodetool command to unregister LEFT nodes (CASSANDRA-19581)
 + * Add cluster metadata id to gossip syn messages (CASSANDRA-19613)
 + * Reduce heap usage occupied by the metrics (CASSANDRA-19567)
 + * Improve handling of transient replicas during range movements 
(CASSANDRA-19344)
 + * Enable debounced internode log requests to be cancelled at shutdown 
(CASSANDRA-19514)
 + * Correctly set last modified epoch when combining multistep operations into 
a single step (CASSANDRA-19538)
 + * Add new TriggersPolicy configuration to allow operators to disable 
triggers (CASSANDRA-19532)
 + * Use Transformation.Kind.id in local and distributed log tables 
(CASSANDRA-19516)
 + * Remove period field from ClusterMetadata and metadata log tables 
(CASSANDRA-19482)
 + * Enrich system_views.pending_hints vtable with hints sizes (CASSANDRA-19486)
 + * Expose all dropwizard metrics in virtual tables (CASSANDRA-14572)
 + * Ensured that PropertyFileSnitchTest do not overwrite 
cassandra-toploogy.properties (CASSANDRA-19502)
 + * Add option for MutualTlsAuthenticator to restrict the certificate validity 
period (CASSANDRA-18951)
 + * Fix StorageService::constructRangeToEndpointMap for non-distributed 
keyspaces (CASSANDRA-19255)
 + * Group nodetool cms commands into single command group (CASSANDRA-19393)
 + * Register the measurements of the bootstrap process as Dropwizard metrics 
(CASSANDRA-19447)
 + * Add LIST SUPERUSERS CQL statement (CASSANDRA-19417)
 + * Modernize CQLSH datetime conversions (CASSANDRA-18879)
 + * Harry model and in-JVM tests for partition-restricted 2i queries 
(CASSANDRA-18275)
 + * Refactor cqlshmain global constants (CASSANDRA-19201)
 + * Remove native_transport_port_ssl (CASSANDRA-19397)
 + * Make nodetool reconfigurecms sync by default and add --cancel to be able 
to cancel ongoing reconfigurations (CASSANDRA-19216)
 + * Expose auth mode in system_views.clients, nodetool clientstats, metrics 
(CASSANDRA-19366)
 + * Remove sealed_periods and last_sealed_period tables (CASSANDRA-19189)
 + * Improve setup and initialisation of LocalLog/LogSpec (CASSANDRA-19271)
 + * Refactor structure of caching metrics and expose auth cache metrics via 
JMX (CASSANDRA-17062)
 + * Allow CQL client certificate authentication to work without sending an 
AUTHENTICATE request (CASSANDRA-18857)
 + * Extend nodetool tpstats and system_views.thread_pools with detailed pool 
parameters (CASSANDRA-19289)
 + * Remove dependency on Sigar in favor of OSHI (CASSANDRA-16565)
 + * Simplify the bind marker and Term logic (CASSANDRA-18813)
 + * Limit cassandra startup to supported JDKs, allow higher JDKs by setting 
CASSANDRA_JDK_UNSUPPORTED (CASSANDRA-18688)
 + * Standardize nodetool tablestats formatting of data units (CASSANDRA-19104)
 + * Make nodetool tablestats use number of significant digits for time and 
average values consistently (CASSANDRA-19015)
 + * Upgrade jackson to 2.15.3 and snakeyaml to 2.1 (CASSANDRA-18875)
 + * Transactional Cluster Metadata [CEP-21] (CASSANDRA-18330)
 + * Add ELAPSED command to cqlsh (CASSANDRA-18861)
 + * Add the ability to disable bulk loading of SSTables (CASSANDRA-18781)
 + * Clean up obsolete functions and simplify cql_version handling in cqlsh 
(CASSANDRA-18787)
 +Merged from 5.0:
++ * SAI marks an index as non-empty when a partial partition/row modifications 
is flushed due to repair (CASSANDRA-20567)
 + * SAI fails queries when multiple columns exist and a non-indexed column is 
a composite with a map (CASSANDRA-19891)
   * Avoid purging deletions in RowFilter when reconciliation is required 
(CASSANDRA-20541)
   * Fixed multiple single-node SAI query bugs relating to static columns 
(CASSANDRA-20338)
   * Upgrade com.datastax.cassandra:cassandra-driver-core:3.11.5 to 
org.apache.cassandra:cassandra-driver-core:3.12.1 (CASSANDRA-17231)
diff --cc 
src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index a68efa120c,58166630bc..fbf2894187
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@@ -47,11 -47,11 +47,12 @@@ import static org.apache.cassandra.conf
  public class LeveledCompactionStrategy extends AbstractCompactionStrategy
  {
      private static final Logger logger = 
LoggerFactory.getLogger(LeveledCompactionStrategy.class);
 -    private static final String SSTABLE_SIZE_OPTION = "sstable_size_in_mb";
 +    public static final String SSTABLE_SIZE_OPTION = "sstable_size_in_mb";
      private static final boolean tolerateSstableSize = 
TOLERATE_SSTABLE_SIZE.getBoolean();
 -    private static final String LEVEL_FANOUT_SIZE_OPTION = "fanout_size";
 -    private static final String SINGLE_SSTABLE_UPLEVEL_OPTION = 
"single_sstable_uplevel";
 +    public static final String LEVEL_FANOUT_SIZE_OPTION = "fanout_size";
 +    public static final String SINGLE_SSTABLE_UPLEVEL_OPTION = 
"single_sstable_uplevel";
      public static final int DEFAULT_LEVEL_FANOUT_SIZE = 10;
++    public static final int DEFAULT_MAX_SSTABLE_SIZE_MIB = 160;
  
      @VisibleForTesting
      final LeveledManifest manifest;
@@@ -62,7 -62,7 +63,7 @@@
      public LeveledCompactionStrategy(ColumnFamilyStore cfs, Map<String, 
String> options)
      {
          super(cfs, options);
--        int configuredMaxSSTableSize = 160;
++        int configuredMaxSSTableSize = DEFAULT_MAX_SSTABLE_SIZE_MIB;
          int configuredLevelFanoutSize = DEFAULT_LEVEL_FANOUT_SIZE;
          boolean configuredSingleSSTableUplevel = false;
          SizeTieredCompactionStrategyOptions localOptions = new 
SizeTieredCompactionStrategyOptions(options);
diff --cc 
test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
index 694729f8bd,3bded5cd16..bdff5b8900
--- a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
+++ b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
@@@ -45,21 -40,15 +45,24 @@@ import java.util.regex.Matcher
  import java.util.regex.Pattern;
  import java.util.stream.Collectors;
  
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.base.Strings;
 +import com.google.common.collect.ImmutableList;
  import com.google.common.util.concurrent.Futures;
 -
 -import org.apache.cassandra.distributed.api.Feature;
 -import org.apache.cassandra.gms.ApplicationState;
 -import org.apache.cassandra.gms.VersionedValue;
 -import org.apache.cassandra.io.util.File;
++import org.apache.cassandra.tcm.compatibility.TokenRingUtils;
++import org.apache.cassandra.utils.FBUtilities;
++import org.apache.cassandra.utils.Shared;
 +import org.assertj.core.api.Assertions;
  import org.junit.Assert;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
  
 +import accord.primitives.TxnId;
 +import org.apache.cassandra.db.virtual.AccordDebugKeyspace;
  import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.distributed.Cluster;
 +import org.apache.cassandra.distributed.api.ConsistencyLevel;
 +import org.apache.cassandra.distributed.api.Feature;
  import org.apache.cassandra.distributed.api.ICluster;
  import org.apache.cassandra.distributed.api.IInstance;
  import org.apache.cassandra.distributed.api.IInstanceConfig;
@@@ -1537,113 -960,60 +1540,150 @@@ public class ClusterUtil
  
      public static void preventSystemExit()
      {
 -        System.setSecurityManager(new SecurityManager()
 +        System.setSecurityManager(new PreventSystemExit());
 +    }
 +
 +    public static void awaitInPeers(Cluster cluster, int[] nodes, IInstance 
expectedInPeers)
 +    {
 +        for (IInvokableInstance inst : cluster.get(nodes))
          {
 -            @Override
 -            public void checkExit(int status)
 -            {
 -                throw new SystemExitException(status);
 -            }
 +            if (inst.config().num() == expectedInPeers.config().num()) 
continue; // ignore self as self is not in peers
 +            awaitInPeers(inst, expectedInPeers);
 +        }
 +    }
  
 -            @Override
 -            public void checkPermission(Permission perm)
 -            {
 -            }
 +    public static void awaitInPeers(IInstance instance, IInstance 
expectedInPeers)
 +    {
 +        for (int i = 0; i < 100; i++)
 +        {
 +            if (isInPeers(instance, expectedInPeers))
 +                return;
 +            sleepUninterruptibly(1, TimeUnit.SECONDS);
 +        }
 +        throw new AssertionError("Unable to find " + 
expectedInPeers.config().broadcastAddress() + " in peers");
 +    }
 +
 +    public static boolean isInPeers(IInstance instance, IInstance 
expectedInPeers)
 +    {
 +        SimpleQueryResult qr = instance.executeInternalWithResult("select 
tokens, data_center, rack from system.peers WHERE peer=?", 
expectedInPeers.config().broadcastAddress().getAddress());
 +        if (!qr.hasNext()) return false;
 +        Row row = qr.next();
 +        // peer is known, but is it fully defined?
 +        Collection<String> tokens = row.get("tokens");
 +        String dc = row.getString("data_center");
 +        String rack = row.getString("rack");
 +        return tokens != null && !tokens.isEmpty() && 
!Strings.isNullOrEmpty(dc) && !Strings.isNullOrEmpty(rack);
 +    }
 +
 +    public static StorageService.Mode mode(IInvokableInstance inst)
 +    {
 +        String name = inst.callOnInstance(() -> 
StorageService.instance.operationMode().name());
 +        return StorageService.Mode.valueOf(name);
 +    }
 +
 +    public static void assertModeJoined(IInvokableInstance inst)
 +    {
 +        Assertions.assertThat(mode(inst))
 +                .describedAs("Unexpected StorageService operation mode")
 +                .isEqualTo(StorageService.Mode.NORMAL);
 +    }
 +
 +    public static <T extends IInstance> LinkedHashMap<String, 
SimpleQueryResult> queryTxnState(AbstractCluster<T> cluster, TxnId txnId, 
int... nodes)
 +    {
 +        String cql = String.format("SELECT * FROM %s.%s WHERE txn_id=?", 
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN_BLOCKED_BY);
 +        LinkedHashMap<String, SimpleQueryResult> map = new LinkedHashMap<>();
 +        Iterable<T> it = nodes.length == 0 ? cluster : cluster.get(nodes);
 +        for (T i : it)
 +        {
 +            if (i.isShutdown())
 +                continue;
 +            SimpleQueryResult result = i.executeInternalWithResult(cql, 
txnId.toString());
 +            map.put(i.toString(), result);
 +        }
 +        return map;
 +    }
 +
 +    public static <T extends IInstance> String 
queryTxnStateAsString(AbstractCluster<T> cluster, TxnId txnId, int... nodes)
 +    {
 +        StringBuilder sb = new StringBuilder();
 +        queryTxnStateAsString(sb, cluster, txnId, nodes);
 +        return sb.toString();
 +    }
  
 -            @Override
 -            public void checkPermission(Permission perm, Object context)
 +    public static <T extends IInstance> void 
queryTxnStateAsString(StringBuilder sb, AbstractCluster<T> cluster, TxnId 
txnId, int... nodes)
 +    {
 +        LinkedHashMap<String, SimpleQueryResult> map = queryTxnState(cluster, 
txnId, nodes);
 +        for (var e : map.entrySet())
 +        {
 +            sb.append(e.getKey()).append(":\n");
 +            SimpleQueryResult result = e.getValue();
 +            if (!result.names().isEmpty())
 +                sb.append(result.names()).append('\n');
 +            while (result.hasNext())
              {
 +                var row = result.next();
 +                sb.append(Arrays.asList(row.toObjectArray())).append('\n');
              }
 +        }
 +    }
 +
 +    public static TableId tableId(Cluster cluster, String ks, String table)
 +    {
 +        String str = cluster.getFirstRunningInstance().callOnInstance(() -> 
Schema.instance.getKeyspaceInstance(ks).getColumnFamilyStore(table).getTableId().toString());
 +        return TableId.fromUUID(UUID.fromString(str));
 +    }
 +
 +    public static void awaitAccordEpochReady(Cluster cluster, long epoch)
 +    {
 +        cluster.forEach(i -> {
 +            if (i.isShutdown()) return;
 +            i.runOnInstance(() -> {
 +                try
 +                {
 +                    
AccordService.instance().epochReady(Epoch.create(epoch)).get();
 +                }
 +                catch (InterruptedException | ExecutionException e)
 +                {
 +                    throw new RuntimeException(e);
 +                }
 +            });
          });
      }
+ 
+     @Shared
+     public static class Range implements Serializable
+     {
+         public final String left, right;
+ 
+         public Range(String left, String right)
+         {
+             this.left = left;
+             this.right = right;
+         }
+ 
+         public Range(long left, long right)
+         {
+             this(Long.toString(left), Long.toString(right));
+         }
+ 
+         public long left()
+         {
+             return Long.parseLong(left);
+         }
+ 
+         public long right()
+         {
+             return Long.parseLong(right);
+         }
+     }
+ 
+     public static List<Range> getPrimaryRanges(IInvokableInstance instance, 
String keyspace)
+     {
+         return instance.callOnInstance(() -> {
 -            var ranges = 
StorageService.instance.getPrimaryRangesForEndpoint(keyspace, 
FBUtilities.getBroadcastAddressAndPort());
++            var ranges = TokenRingUtils.getPrimaryRangesForEndpoint(keyspace, 
FBUtilities.getBroadcastAddressAndPort());
+             return ranges.stream()
+                     .flatMap(r -> r.unwrap().stream().map(r2 -> new 
Range(r2.left.toString(), r2.right.toString())))
+                     .collect(Collectors.toList());
+         });
+     }
  }
diff --cc 
test/distributed/org/apache/cassandra/fuzz/sai/AccordFullMultiNodeSAITest.java
index 23b52833af,0000000000..291ef7dcaa
mode 100644,000000..100644
--- 
a/test/distributed/org/apache/cassandra/fuzz/sai/AccordFullMultiNodeSAITest.java
+++ 
b/test/distributed/org/apache/cassandra/fuzz/sai/AccordFullMultiNodeSAITest.java
@@@ -1,42 -1,0 +1,42 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.fuzz.sai;
 +
 +import org.junit.Ignore;
 +
 +import org.apache.cassandra.harry.SchemaSpec;
 +import org.apache.cassandra.harry.gen.Generator;
 +import org.apache.cassandra.harry.gen.SchemaGenerators;
 +import org.apache.cassandra.service.consensus.TransactionalMode;
 +
- @Ignore("CASSANDRA-20567: Repair is failing due to missing SAI index files 
when using zero copy streaming")
++@Ignore("It was believed that these tests were failing due to 
CASSANDRA-20567, but in fixing that issue it was found that the tests are still 
failing!  Harry is detecting an incorrect response...")
 +public class AccordFullMultiNodeSAITest extends MultiNodeSAITestBase
 +{
 +    public AccordFullMultiNodeSAITest()
 +    {
 +        super(TransactionalMode.full);
 +    }
 +
 +    @Override
 +    protected Generator<SchemaSpec> schemaGenerator(boolean disableReadRepair)
 +    {
 +        return SchemaGenerators.schemaSpecGen(KEYSPACE, "basic_sai", 
MAX_PARTITION_SIZE,
 +                                              
SchemaSpec.optionsBuilder().disableReadRepair(disableReadRepair).withTransactionalMode(TransactionalMode.full));
 +    }
 +}
diff --cc 
test/distributed/org/apache/cassandra/fuzz/sai/AccordInteropMultiNodeSAITest.java
index ba937c0f91,0000000000..2d665eb47d
mode 100644,000000..100644
--- 
a/test/distributed/org/apache/cassandra/fuzz/sai/AccordInteropMultiNodeSAITest.java
+++ 
b/test/distributed/org/apache/cassandra/fuzz/sai/AccordInteropMultiNodeSAITest.java
@@@ -1,43 -1,0 +1,43 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.fuzz.sai;
 +
 +import org.junit.Ignore;
 +
 +import org.apache.cassandra.harry.SchemaSpec;
 +import org.apache.cassandra.harry.gen.Generator;
 +import org.apache.cassandra.harry.gen.SchemaGenerators;
 +import org.apache.cassandra.service.consensus.TransactionalMode;
 +
- @Ignore("CASSANDRA-20567: Repair is failing due to missing SAI index files 
when using zero copy streaming")
++@Ignore("It was believed that these tests were failing due to 
CASSANDRA-20567, but in fixing that issue it was found that the tests are still 
failing!  Harry is detecting an incorrect response...")
 +public class AccordInteropMultiNodeSAITest extends MultiNodeSAITestBase
 +{
 +    public AccordInteropMultiNodeSAITest()
 +    {
 +        super(TransactionalMode.test_interop_read);
 +    }
 +
 +    @Override
 +    protected Generator<SchemaSpec> schemaGenerator(boolean disableReadRepair)
 +    {
 +        return SchemaGenerators.schemaSpecGen(KEYSPACE, "basic_sai", 
MAX_PARTITION_SIZE,
 +                                              
SchemaSpec.optionsBuilder().disableReadRepair(disableReadRepair)
 +                                                                         
.withTransactionalMode(TransactionalMode.test_interop_read));
 +    }
 +}
diff --cc test/unit/org/apache/cassandra/utils/CassandraGenerators.java
index 33ebdb6c3a,b064ec98fc..08b06c0730
--- a/test/unit/org/apache/cassandra/utils/CassandraGenerators.java
+++ b/test/unit/org/apache/cassandra/utils/CassandraGenerators.java
@@@ -48,17 -38,6 +48,19 @@@ import javax.annotation.Nullable
  
  import com.google.common.collect.ImmutableList;
  import com.google.common.collect.ImmutableMap;
 +import com.google.common.collect.ImmutableSet;
 +import com.google.common.collect.Sets;
++
++import org.apache.cassandra.db.compaction.LeveledManifest;
 +import org.apache.cassandra.schema.*;
 +import 
org.apache.cassandra.service.consensus.migration.ConsensusMigrationState;
 +import org.apache.cassandra.tcm.extensions.ExtensionKey;
 +import org.apache.cassandra.tcm.extensions.ExtensionValue;
 +import org.apache.cassandra.tcm.membership.Directory;
 +import org.apache.cassandra.tcm.ownership.DataPlacements;
 +import org.apache.cassandra.tcm.ownership.TokenMap;
 +import org.apache.cassandra.tcm.sequences.InProgressSequences;
 +import org.apache.cassandra.tcm.sequences.LockedRanges;
  import org.apache.commons.lang3.builder.MultilineRecursiveToStringStyle;
  import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
  
@@@ -302,600 -185,6 +304,628 @@@ public final class CassandraGenerator
          return new TableMetadataBuilder().withKeyspaceName(ks).build(rnd);
      }
  
 +    public static Gen<String> sstableFormatNames()
 +    {
 +        return SourceDSL.arbitrary().pick("big", "bti");
 +    }
 +
 +    public static Gen<SSTableFormat<?, ?>> sstableFormat()
 +    {
 +        // make sure ordering is determanstic, else repeatability breaks
 +        NavigableMap<String, SSTableFormat<?, ?>> formats = new 
TreeMap<>(DatabaseDescriptor.getSSTableFormats());
 +        return SourceDSL.arbitrary().pick(new ArrayList<>(formats.values()));
 +    }
 +
 +    public static class AbstractReplicationStrategyBuilder
 +    {
 +        public enum Strategy
 +        {
 +            Simple(true),
 +            NetworkTopology(true),
 +            Local(false),
 +            Meta(false);
 +
 +            public final boolean userAllowed;
 +
 +            Strategy(boolean userAllowed)
 +            {
 +                this.userAllowed = userAllowed;
 +            }
 +        }
 +
 +        private Gen<Strategy> strategyGen = 
SourceDSL.arbitrary().enumValues(Strategy.class);
 +        private Gen<String> keyspaceNameGen = KEYSPACE_NAME_GEN;
 +        private Gen<Integer> rfGen = SourceDSL.integers().between(1, 3);
 +        private Gen<List<String>> networkTopologyDCGen = rs -> {
 +            Gen<Integer> numDcsGen = SourceDSL.integers().between(1, 3);
 +            Gen<String> nameGen = IDENTIFIER_GEN;
 +            Set<String> dcs = new HashSet<>();
 +            int targetSize = numDcsGen.generate(rs);
 +            while (dcs.size() != targetSize)
 +                dcs.add(nameGen.generate(rs));
 +            List<String> ordered = new ArrayList<>(dcs);
 +            ordered.sort(Comparator.naturalOrder());
 +            return ordered;
 +        };
 +
 +        public AbstractReplicationStrategyBuilder withKeyspace(Gen<String> 
keyspaceNameGen)
 +        {
 +            this.keyspaceNameGen = keyspaceNameGen;
 +            return this;
 +        }
 +
 +        public AbstractReplicationStrategyBuilder withKeyspace(String 
keyspace)
 +        {
 +            this.keyspaceNameGen = i -> keyspace;
 +            return this;
 +        }
 +
 +        public AbstractReplicationStrategyBuilder withUserAllowed()
 +        {
 +            List<Strategy> allowed = Stream.of(Strategy.values()).filter(s -> 
s.userAllowed).collect(Collectors.toList());
 +            strategyGen = SourceDSL.arbitrary().pick(allowed);
 +            return this;
 +        }
 +
 +        public AbstractReplicationStrategyBuilder withRf(Gen<Integer> rfGen)
 +        {
 +            this.rfGen = rfGen;
 +            return this;
 +        }
 +
 +        public AbstractReplicationStrategyBuilder withRf(int rf)
 +        {
 +            this.rfGen = i -> rf;
 +            return this;
 +        }
 +
 +        public AbstractReplicationStrategyBuilder 
withDatacenters(Gen<List<String>> networkTopologyDCGen)
 +        {
 +            this.networkTopologyDCGen = networkTopologyDCGen;
 +            return this;
 +        }
 +
 +        public AbstractReplicationStrategyBuilder withDatacenters(String 
first, String... rest)
 +        {
 +            if (rest.length == 0)
 +            {
 +                this.networkTopologyDCGen = i -> 
Collections.singletonList(first);
 +            }
 +            else
 +            {
 +                List<String> all = new ArrayList<>(rest.length + 1);
 +                all.add(first);
 +                all.addAll(Arrays.asList(rest));
 +                this.networkTopologyDCGen = i -> all;
 +            }
 +            return this;
 +        }
 +
 +        public Gen<AbstractReplicationStrategy> build()
 +        {
 +            return rs -> {
 +                Strategy strategy = strategyGen.generate(rs);
 +                switch (strategy)
 +                {
 +                    case Simple:
 +                        return new 
SimpleStrategy(keyspaceNameGen.generate(rs),
 +                                                  
ImmutableMap.of(SimpleStrategy.REPLICATION_FACTOR, 
rfGen.generate(rs).toString()));
 +                    case NetworkTopology:
 +                        ImmutableMap.Builder<String, String> builder = 
ImmutableMap.builder();
 +                        List<String> names = 
networkTopologyDCGen.generate(rs);
 +                        for (String name : names)
 +                            builder.put(name, rfGen.generate(rs).toString());
 +                        ImmutableMap<String, String> map = builder.build();
 +                        return new 
TestableNetworkTopologyStrategy(keyspaceNameGen.generate(rs), map);
 +                    case Meta:
 +                        return new MetaStrategy(keyspaceNameGen.generate(rs), 
ImmutableMap.of());
 +                    case Local:
 +                        return new 
LocalStrategy(keyspaceNameGen.generate(rs), ImmutableMap.of());
 +                    default:
 +                        throw new 
UnsupportedOperationException(strategy.name());
 +                }
 +            };
 +        }
 +    }
 +
 +    public static class TestableNetworkTopologyStrategy extends 
NetworkTopologyStrategy
 +    {
 +        public TestableNetworkTopologyStrategy(String keyspaceName, 
Map<String, String> configOptions) throws ConfigurationException
 +        {
 +            super(keyspaceName, configOptions);
 +        }
 +
 +        @Override
 +        public Collection<String> recognizedOptions(ClusterMetadata metadata)
 +        {
 +            return configOptions.keySet();
 +        }
 +    }
 +
 +    public static KeyspaceMetadataBuilder regularKeyspace()
 +    {
 +        return new 
KeyspaceMetadataBuilder().withKind(KeyspaceMetadata.Kind.REGULAR);
 +    }
 +
 +    public static class KeyspaceMetadataBuilder
 +    {
 +        private Gen<String> nameGen = KEYSPACE_NAME_GEN;
 +        private Gen<KeyspaceMetadata.Kind> kindGen = 
SourceDSL.arbitrary().enumValues(KeyspaceMetadata.Kind.class);
 +        private Gen<AbstractReplicationStrategyBuilder> replicationGen = i -> 
new AbstractReplicationStrategyBuilder();
 +        private Gen<Boolean> durableWritesGen = SourceDSL.booleans().all();
 +
 +        public KeyspaceMetadataBuilder 
withReplication(Gen<AbstractReplicationStrategyBuilder> replicationGen)
 +        {
 +            this.replicationGen = replicationGen;
 +            return this;
 +        }
 +
 +        public KeyspaceMetadataBuilder 
withReplication(AbstractReplicationStrategyBuilder replication)
 +        {
 +            this.replicationGen = i -> replication;
 +            return this;
 +        }
 +
 +        public KeyspaceMetadataBuilder withName(Gen<String> nameGen)
 +        {
 +            this.nameGen = nameGen;
 +            return this;
 +        }
 +
 +        public KeyspaceMetadataBuilder withName(String name)
 +        {
 +            this.nameGen = i -> name;
 +            return this;
 +        }
 +
 +        public KeyspaceMetadataBuilder withKind(Gen<KeyspaceMetadata.Kind> 
kindGen)
 +        {
 +            this.kindGen = kindGen;
 +            return this;
 +        }
 +
 +        public KeyspaceMetadataBuilder withKind(KeyspaceMetadata.Kind kind)
 +        {
 +            this.kindGen = i -> kind;
 +            return this;
 +        }
 +
 +        public Gen<KeyspaceMetadata> build()
 +        {
 +            return rs -> {
 +                String name = nameGen.generate(rs);
 +                KeyspaceMetadata.Kind kind = kindGen.generate(rs);
 +                AbstractReplicationStrategy replication = 
replicationGen.generate(rs).withKeyspace(nameGen).build().generate(rs);
 +                ReplicationParams replicationParams = 
ReplicationParams.fromStrategy(replication);
 +                boolean durableWrites = durableWritesGen.generate(rs);
 +                KeyspaceParams params = new KeyspaceParams(durableWrites, 
replicationParams, FastPathStrategy.simple());
 +                Tables tables = Tables.none();
 +                Views views = Views.none();
 +                Types types = Types.none();
 +                UserFunctions userFunctions = UserFunctions.none();
 +                return KeyspaceMetadata.createUnsafe(name, kind, params, 
tables, views, types, userFunctions);
 +            };
 +        }
 +    }
 +
 +    public static Gen<CachingParams> cachingParamsGen()
 +    {
 +        return rnd -> {
 +            boolean cacheKeys = nextBoolean(rnd);
 +            int rowsPerPartitionToCache;
 +            switch (SourceDSL.integers().between(1, 3).generate(rnd))
 +            {
 +                case 1: // ALL
 +                    rowsPerPartitionToCache = Integer.MAX_VALUE;
 +                    break;
 +                case 2: // NONE
 +                    rowsPerPartitionToCache = 0;
 +                    break;
 +                case 3: // num values
 +                    rowsPerPartitionToCache = 
Math.toIntExact(rnd.next(Constraint.between(1, Integer.MAX_VALUE - 1)));
 +                    break;
 +                default:
 +                    throw new AssertionError();
 +            }
 +            return new CachingParams(cacheKeys, rowsPerPartitionToCache);
 +        };
 +    }
 +
 +    public enum KnownCompactionAlgo
 +    {
 +        SizeTiered(SizeTieredCompactionStrategy.class),
 +        Leveled(LeveledCompactionStrategy.class),
 +        Unified(UnifiedCompactionStrategy.class);
 +        private final Class<? extends AbstractCompactionStrategy> klass;
 +
 +        KnownCompactionAlgo(Class<? extends AbstractCompactionStrategy> klass)
 +        {
 +            this.klass = klass;
 +        }
 +    }
 +
 +    public static class CompactionParamsBuilder
 +    {
 +        private Gen<KnownCompactionAlgo> algoGen = 
SourceDSL.arbitrary().enumValues(KnownCompactionAlgo.class);
 +        private Gen<CompactionParams.TombstoneOption> tombstoneOptionGen = 
SourceDSL.arbitrary().enumValues(CompactionParams.TombstoneOption.class);
 +        private Gen<Map<String, String>> sizeTieredOptions = rnd -> {
 +            if (nextBoolean(rnd)) return Map.of();
 +            Map<String, String> options = new HashMap<>();
 +            if (nextBoolean(rnd))
 +                // computes mb then converts to bytes
 +                
options.put(SizeTieredCompactionStrategyOptions.MIN_SSTABLE_SIZE_KEY, 
Long.toString(SourceDSL.longs().between(1, 100).generate(rnd) * 1024L * 1024L));
 +            if (nextBoolean(rnd))
 +                
options.put(SizeTieredCompactionStrategyOptions.BUCKET_LOW_KEY, 
Double.toString(SourceDSL.doubles().between(0.1, 0.9).generate(rnd)));
 +            if (nextBoolean(rnd))
 +                
options.put(SizeTieredCompactionStrategyOptions.BUCKET_HIGH_KEY, 
Double.toString(SourceDSL.doubles().between(1.1, 1.9).generate(rnd)));
 +            return options;
 +        };
 +        private Gen<Map<String, String>> leveledOptions = rnd -> {
 +            if (nextBoolean(rnd)) return Map.of();
 +            Map<String, String> options = new HashMap<>();
 +            if (nextBoolean(rnd))
 +                options.putAll(sizeTieredOptions.generate(rnd));
++            int maxSSTableSizeInMB = 
LeveledCompactionStrategy.DEFAULT_MAX_SSTABLE_SIZE_MIB;
 +            if (nextBoolean(rnd))
++            {
 +                // size in mb
-                 options.put(LeveledCompactionStrategy.SSTABLE_SIZE_OPTION, 
SourceDSL.integers().between(1, 2_000).generate(rnd).toString());
++                maxSSTableSizeInMB = SourceDSL.integers().between(1, 
2_000).generate(rnd);
++                options.put(LeveledCompactionStrategy.SSTABLE_SIZE_OPTION, 
Integer.toString(maxSSTableSizeInMB));
++            }
 +            if (nextBoolean(rnd))
-                 
options.put(LeveledCompactionStrategy.LEVEL_FANOUT_SIZE_OPTION, 
SourceDSL.integers().between(1, 100).generate(rnd).toString());
++            {
++                // there is a relationship between sstable size and fanout, 
so respect it
++                // see CASSANDRA-20570: Leveled Compaction doesn't validate 
maxBytesForLevel when the table is altered/created
++                long maxSSTableSizeInBytes = maxSSTableSizeInMB * 1024L * 
1024L;
++                Gen<Integer> gen = SourceDSL.integers().between(1, 100);
++                Integer value = gen.generate(rnd);
++                while (true)
++                {
++                    try
++                    {
++                        // see 
org.apache.cassandra.db.compaction.LeveledGenerations.MAX_LEVEL_COUNT for why 8 
is hard coded here
++                        LeveledManifest.maxBytesForLevel(8, value, 
maxSSTableSizeInBytes);
++                        break; // value is good, keep it
++                    }
++                    catch (RuntimeException e)
++                    {
++                        // this value is too large... lets shrink it
++                        if (value.intValue() == 1)
++                            throw new AssertionError("There is no possible 
fanout size that works with maxSSTableSizeInMB=" + maxSSTableSizeInMB);
++                        gen = SourceDSL.integers().between(1, value - 1);
++                        value = gen.generate(rnd);
++                    }
++                }
++                
options.put(LeveledCompactionStrategy.LEVEL_FANOUT_SIZE_OPTION, 
value.toString());
++            }
 +            if (nextBoolean(rnd))
 +                
options.put(LeveledCompactionStrategy.SINGLE_SSTABLE_UPLEVEL_OPTION, 
nextBoolean(rnd).toString());
 +            return options;
 +        };
 +        private Gen<Map<String, String>> unifiedOptions = rnd -> {
 +            if (nextBoolean(rnd)) return Map.of();
 +            Gen<String> storageSizeGen = 
Generators.filter(humanReadableStorageSimple(), s -> 
Controller.MIN_TARGET_SSTABLE_SIZE <= FBUtilities.parseHumanReadableBytes(s));
 +            Map<String, String> options = new HashMap<>();
 +            if (nextBoolean(rnd))
 +                options.put(Controller.BASE_SHARD_COUNT_OPTION, 
SourceDSL.integers().between(1, 10).generate(rnd).toString());
 +            if (nextBoolean(rnd))
 +                options.put(Controller.FLUSH_SIZE_OVERRIDE_OPTION, 
storageSizeGen.generate(rnd));
 +            if (nextBoolean(rnd))
 +                options.put(Controller.MAX_SSTABLES_TO_COMPACT_OPTION, 
SourceDSL.integers().between(0, 32).generate(rnd).toString());
 +            if (nextBoolean(rnd))
 +                options.put(Controller.SSTABLE_GROWTH_OPTION, 
SourceDSL.integers().between(0, 100).generate(rnd) + "%");
 +            if (nextBoolean(rnd))
 +                options.put(Controller.OVERLAP_INCLUSION_METHOD_OPTION, 
SourceDSL.arbitrary().enumValues(Overlaps.InclusionMethod.class).generate(rnd).name());
 +            if (nextBoolean(rnd))
 +            {
 +                int numLevels = SourceDSL.integers().between(1, 
10).generate(rnd);
 +                String[] scalingParams = new String[numLevels];
 +                Gen<Integer> levelSize = SourceDSL.integers().between(2, 10);
 +                for (int i = 0; i < numLevels; i++)
 +                {
 +                    String value;
 +                    switch (SourceDSL.integers().between(0, 3).generate(rnd))
 +                    {
 +                        case 0:
 +                            value = "N";
 +                            break;
 +                        case 1:
 +                            value = "L" + levelSize.generate(rnd);
 +                            break;
 +                        case 2:
 +                            value = "T" + levelSize.generate(rnd);
 +                            break;
 +                        case 3:
 +                            value = 
SourceDSL.integers().all().generate(rnd).toString();
 +                            break;
 +                        default:
 +                            throw new AssertionError();
 +                    }
 +                    scalingParams[i] = value;
 +                }
 +                options.put(Controller.SCALING_PARAMETERS_OPTION, 
String.join(",", scalingParams));
 +            }
 +            if (nextBoolean(rnd))
 +            {
 +                // Calculate TARGET then compute the MIN from that.  The 
issue is that there is a hidden relationship
 +                // between these 2 fields more complex than simple 
comparability, MIN must be < 70% * TARGET!
 +                // See CASSANDRA-20398
 +                // 1MiB to 128MiB target
 +                long targetBytes = SourceDSL.longs().between(1L << 20, 1L << 
27).generate(rnd);
 +                long limit = (long) Math.ceil(targetBytes * Math.sqrt(0.5));
 +                long minBytes = SourceDSL.longs().between(1, limit - 
1).generate(rnd);
 +                options.put(Controller.MIN_SSTABLE_SIZE_OPTION, minBytes + 
"B");
 +                options.put(Controller.TARGET_SSTABLE_SIZE_OPTION, 
targetBytes + "B");
 +            }
 +            return options;
 +        };
 +        //TODO (coverage): doesn't look to validate > 1, what does that even 
mean?
 +        private Gen<Float> tombstoneThreshold = SourceDSL.floats().between(0, 
1);
 +        private Gen<Boolean> uncheckedTombstoneCompaction = 
SourceDSL.booleans().all();
 +        private Gen<Boolean> onlyPurgeRepairedTombstones = 
SourceDSL.booleans().all();
 +
 +        public Gen<CompactionParams> build()
 +        {
 +            return rnd -> {
 +                KnownCompactionAlgo algo = algoGen.generate(rnd);
 +                Map<String, String> options = new HashMap<>();
 +                if (nextBoolean(rnd))
 +                    
options.put(CompactionParams.Option.PROVIDE_OVERLAPPING_TOMBSTONES.toString(), 
tombstoneOptionGen.generate(rnd).name());
 +                if (CompactionParams.supportsThresholdParams(algo.klass) && 
nextBoolean(rnd))
 +                {
 +                    
options.put(CompactionParams.Option.MIN_THRESHOLD.toString(), 
Long.toString(rnd.next(Constraint.between(2, 4))));
 +                    
options.put(CompactionParams.Option.MAX_THRESHOLD.toString(), 
Long.toString(rnd.next(Constraint.between(5, 32))));
 +                }
 +                if (nextBoolean(rnd))
 +                    
options.put(AbstractCompactionStrategy.TOMBSTONE_THRESHOLD_OPTION, 
tombstoneThreshold.generate(rnd).toString());
 +                if (nextBoolean(rnd))
 +                    
options.put(AbstractCompactionStrategy.UNCHECKED_TOMBSTONE_COMPACTION_OPTION, 
uncheckedTombstoneCompaction.generate(rnd).toString());
 +                if (nextBoolean(rnd))
 +                    
options.put(AbstractCompactionStrategy.ONLY_PURGE_REPAIRED_TOMBSTONES, 
onlyPurgeRepairedTombstones.generate(rnd).toString());
 +                switch (algo)
 +                {
 +                    case SizeTiered:
 +                        options.putAll(sizeTieredOptions.generate(rnd));
 +                        break;
 +                    case Leveled:
 +                        options.putAll(leveledOptions.generate(rnd));
 +                        break;
 +                    case Unified:
 +                        options.putAll(unifiedOptions.generate(rnd));
 +                        break;
 +                    default:
 +                        throw new UnsupportedOperationException(algo.name());
 +                }
 +                return CompactionParams.create(algo.klass, options);
 +            };
 +        }
 +    }
 +
 +    private static Boolean nextBoolean(RandomnessSource rnd)
 +    {
 +        return SourceDSL.booleans().all().generate(rnd);
 +    }
 +
 +    public static Gen<CompactionParams> compactionParamsGen()
 +    {
 +        return new CompactionParamsBuilder().build();
 +    }
 +
 +    public enum KnownCompressionAlgo
 +    {
 +        snappy("SnappyCompressor"),
 +        deflate("DeflateCompressor"),
 +        lz4("LZ4Compressor"),
 +        zstd("ZstdCompressor"),
 +        noop("NoopCompressor");
 +
 +        private final String compressor;
 +
 +        KnownCompressionAlgo(String compressor)
 +        {
 +            this.compressor = compressor;
 +        }
 +    }
 +
 +    public static class CompressionParamsBuilder
 +    {
 +        private Gen<Boolean> enabledGen = SourceDSL.booleans().all();
 +        private Gen<KnownCompressionAlgo> algoGen = 
SourceDSL.arbitrary().enumValues(KnownCompressionAlgo.class);
 +        private Gen<Map<String, String>> lz4OptionsGen = rnd -> {
 +            if (nextBoolean(rnd))
 +                return Map.of();
 +            Map<String, String> options = new HashMap<>();
 +            if (nextBoolean(rnd))
 +                options.put(LZ4Compressor.LZ4_COMPRESSOR_TYPE, 
nextBoolean(rnd) ? LZ4Compressor.LZ4_FAST_COMPRESSOR : 
LZ4Compressor.LZ4_HIGH_COMPRESSOR);
 +            if (nextBoolean(rnd))
 +                options.put(LZ4Compressor.LZ4_HIGH_COMPRESSION_LEVEL, 
Integer.toString(Math.toIntExact(rnd.next(Constraint.between(1, 17)))));
 +            return options;
 +        };
 +        private Gen<Map<String, String>> zstdOptionsGen = rnd -> {
 +            if (nextBoolean(rnd))
 +                return Map.of();
 +            int level = 
Math.toIntExact(rnd.next(Constraint.between(ZstdCompressor.FAST_COMPRESSION_LEVEL,
 ZstdCompressor.BEST_COMPRESSION_LEVEL)));
 +            return Map.of(ZstdCompressor.COMPRESSION_LEVEL_OPTION_NAME, 
Integer.toString(level));
 +        };
 +
 +        public Gen<CompressionParams> build()
 +        {
 +            return rnd -> {
 +                if (!enabledGen.generate(rnd))
 +                    return CompressionParams.noCompression();
 +                KnownCompressionAlgo algo = algoGen.generate(rnd);
 +                if (algo == KnownCompressionAlgo.noop)
 +                    return CompressionParams.noop();
 +                // when null disabled
 +                int chunkLength = CompressionParams.DEFAULT_CHUNK_LENGTH;
 +                double minCompressRatio = 
CompressionParams.DEFAULT_MIN_COMPRESS_RATIO;
 +                Map<String, String> options;
 +                switch (algo)
 +                {
 +                    case lz4:
 +                        options = lz4OptionsGen.generate(rnd);
 +                        break;
 +                    case zstd:
 +                        options = zstdOptionsGen.generate(rnd);
 +                        break;
 +                    default:
 +                        options = Map.of();
 +                }
 +                return new CompressionParams(algo.compressor, options, 
chunkLength, minCompressRatio);
 +            };
 +        }
 +    }
 +
 +    public static Gen<CompressionParams> compressionParamsGen()
 +    {
 +        return new CompressionParamsBuilder().build();
 +    }
 +
 +    public static class TableParamsBuilder
 +    {
 +        @Nullable
 +        private Gen<String> memtableKeyGen = null;
 +        @Nullable
 +        private Gen<CachingParams> cachingParamsGen = null;
 +        @Nullable
 +        private Gen<CompactionParams> compactionParamsGen = null;
 +        @Nullable
 +        private Gen<CompressionParams> compressionParamsGen = null;
 +        @Nullable
 +        private Gen<TransactionalMode> transactionalMode = null;
 +        @Nullable
 +        private Gen<FastPathStrategy> fastPathStrategy = null;
 +
 +        public TableParamsBuilder withKnownMemtables()
 +        {
 +            Set<String> known = MemtableParams.knownDefinitions();
 +            // for testing reason, some invalid types are added; filter out
 +            List<String> valid = known.stream().filter(name -> 
!name.startsWith("test_")).collect(Collectors.toList());
 +            memtableKeyGen = SourceDSL.arbitrary().pick(valid);
 +            return this;
 +        }
 +
 +        public TableParamsBuilder withCaching()
 +        {
 +            cachingParamsGen = cachingParamsGen();
 +            return this;
 +        }
 +
 +        public TableParamsBuilder withCompaction()
 +        {
 +            compactionParamsGen = compactionParamsGen();
 +            return this;
 +        }
 +
 +        public TableParamsBuilder withCompression()
 +        {
 +            compressionParamsGen = compressionParamsGen();
 +            return this;
 +        }
 +
 +        public TableParamsBuilder 
withTransactionalMode(Gen<TransactionalMode> transactionalMode)
 +        {
 +            this.transactionalMode = transactionalMode;
 +            return this;
 +        }
 +
 +        public TableParamsBuilder withTransactionalMode()
 +        {
 +            return 
withTransactionalMode(SourceDSL.arbitrary().enumValues(TransactionalMode.class));
 +        }
 +
 +        public TableParamsBuilder withTransactionalMode(TransactionalMode 
transactionalMode)
 +        {
 +            return 
withTransactionalMode(SourceDSL.arbitrary().constant(transactionalMode));
 +        }
 +
 +        public TableParamsBuilder withFastPathStrategy()
 +        {
 +            fastPathStrategy = rnd -> {
 +                FastPathStrategy.Kind kind = 
SourceDSL.arbitrary().enumValues(FastPathStrategy.Kind.class).generate(rnd);
 +                switch (kind)
 +                {
 +                    case SIMPLE:
 +                        return SimpleFastPathStrategy.instance;
 +                    case INHERIT_KEYSPACE:
 +                        return InheritKeyspaceFastPathStrategy.instance;
 +                    case PARAMETERIZED:
 +                    {
 +                        Map<String, String> map = new HashMap<>();
 +                        int size = SourceDSL.integers().between(1, 
Integer.MAX_VALUE).generate(rnd);
 +                        map.put(ParameterizedFastPathStrategy.SIZE, 
Integer.toString(size));
 +                        Set<String> names = new HashSet<>();
 +                        Gen<String> nameGen = 
SourceDSL.strings().allPossible().ofLengthBetween(1, 10)
 +                                                       // If : is in the name 
then the parser will fail; we have validation to disalow this
 +                                                       .map(s -> 
s.replace(":", "_"))
 +                                                       // Names are used for 
DCs and those are seperated by ,
 +                                                       .map(s -> 
s.replace(",", "_"))
 +                                                       .assuming(s -> 
!s.trim().isEmpty());
 +                        int numNames = SourceDSL.integers().between(1, 
10).generate(rnd);
 +                        for (int i = 0; i < numNames; i++)
 +                        {
 +                            while (!names.add(nameGen.generate(rnd)))
 +                            {
 +                            }
 +                        }
 +                        List<String> sortedNames = new ArrayList<>(names);
 +                        sortedNames.sort(Comparator.naturalOrder());
 +                        List<String> dcs = new ArrayList<>(names.size());
 +                        boolean auto = 
SourceDSL.booleans().all().generate(rnd);
 +                        if (auto)
 +                        {
 +                            dcs.addAll(sortedNames);
 +                        }
 +                        else
 +                        {
 +                            for (String name : sortedNames)
 +                            {
 +                                int weight = SourceDSL.integers().between(0, 
10).generate(rnd);
 +                                dcs.add(name + ":" + weight);
 +                            }
 +                        }
 +                        // str: dcFormat(,dcFormat)*
 +                        //      dcFormat: name | weight
 +                        //      weight: int: >= 0
 +                        //      note: can't mix auto and user defined weight; 
need one or the other.  Names must be unique
 +                        map.put(ParameterizedFastPathStrategy.DCS, 
String.join(",", dcs));
 +                        return ParameterizedFastPathStrategy.fromMap(map);
 +                    }
 +                    default:
 +                        throw new UnsupportedOperationException(kind.name());
 +                }
 +            };
 +            return this;
 +        }
 +
 +        public Gen<TableParams> build()
 +        {
 +            return rnd -> {
 +                TableParams.Builder params = TableParams.builder();
 +                if (memtableKeyGen != null)
 +                    
params.memtable(MemtableParams.get(memtableKeyGen.generate(rnd)));
 +                if (cachingParamsGen != null)
 +                    params.caching(cachingParamsGen.generate(rnd));
 +                if (compactionParamsGen != null)
 +                    params.compaction(compactionParamsGen.generate(rnd));
 +                if (compressionParamsGen != null)
 +                    params.compression(compressionParamsGen.generate(rnd));
 +                if (transactionalMode != null)
 +                    params.transactionalMode(transactionalMode.generate(rnd));
 +                if (fastPathStrategy != null)
 +                    params.fastPath(fastPathStrategy.generate(rnd));
 +                return params.build();
 +            };
 +        }
 +    }
 +
 +    public static TableMetadataBuilder regularTable()
 +    {
 +        return new TableMetadataBuilder()
 +               .withTableKinds(TableMetadata.Kind.REGULAR)
 +               .withKnownMemtables();
 +    }
 +
      public static class TableMetadataBuilder
      {
          private Gen<String> ksNameGen = CassandraGenerators.KEYSPACE_NAME_GEN;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to