This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit e1e39b074ac61f293d652b516a70c73b65c7ac3f
Author: Marcus Eriksson <[email protected]>
AuthorDate: Fri Nov 21 14:48:35 2025 +0100

    Refactor the way we check if a transformation is allowed to be committed 
during upgrades
    
    Patch by marcuse and Sam Tunnicliffe; reviewed by Sam Tunnicliffe for 
CASSANDRA-21043
    
    Co-authored-by: Sam Tunnicliffe <[email protected]>
---
 CHANGES.txt                                        |   1 +
 .../config/CassandraRelevantProperties.java        |   2 -
 .../statements/schema/AlterKeyspaceStatement.java  |   7 +
 .../statements/schema/AlterTableStatement.java     |  48 +++++++
 .../cql3/statements/schema/AlterTypeStatement.java |  19 +++
 .../cql3/statements/schema/AlterViewStatement.java |   7 +
 .../schema/CommentOnColumnStatement.java           |   8 ++
 .../schema/CommentOnKeyspaceStatement.java         |   7 +
 .../statements/schema/CommentOnTableStatement.java |   7 +
 .../schema/CommentOnUserTypeFieldStatement.java    |   7 +
 .../schema/CommentOnUserTypeStatement.java         |   7 +
 .../cql3/statements/schema/CopyTableStatement.java |   7 +
 .../schema/CreateAggregateStatement.java           |   7 +
 .../statements/schema/CreateFunctionStatement.java |   7 +
 .../statements/schema/CreateIndexStatement.java    |   7 +
 .../statements/schema/CreateKeyspaceStatement.java |   7 +
 .../statements/schema/CreateTableStatement.java    |   7 +
 .../statements/schema/CreateTriggerStatement.java  |   7 +
 .../statements/schema/CreateTypeStatement.java     |   7 +
 .../statements/schema/CreateViewStatement.java     |   7 +
 .../statements/schema/DropAggregateStatement.java  |   7 +
 .../statements/schema/DropFunctionStatement.java   |   7 +
 .../cql3/statements/schema/DropIndexStatement.java |   7 +
 .../statements/schema/DropKeyspaceStatement.java   |   7 +
 .../cql3/statements/schema/DropTableStatement.java |   6 +
 .../statements/schema/DropTriggerStatement.java    |   7 +
 .../cql3/statements/schema/DropTypeStatement.java  |   7 +
 .../cql3/statements/schema/DropViewStatement.java  |   7 +
 .../schema/SecurityLabelOnColumnStatement.java     |   6 +
 .../schema/SecurityLabelOnKeyspaceStatement.java   |   7 +
 .../schema/SecurityLabelOnTableStatement.java      |   7 +
 .../SecurityLabelOnUserTypeFieldStatement.java     |   7 +
 .../schema/SecurityLabelOnUserTypeStatement.java   |   7 +
 .../cassandra/io/sstable/CQLSSTableWriter.java     |  17 ++-
 .../cassandra/schema/SchemaTransformation.java     |  27 ++++
 .../cassandra/schema/SchemaTransformations.java    | 150 ++++++++++-----------
 .../cassandra/tcm/AbstractLocalProcessor.java      |   9 +-
 .../org/apache/cassandra/tcm/Transformation.java   | 140 +++++++++++--------
 .../cassandra/tcm/transformations/AlterSchema.java |   5 +
 .../tcm/transformations/CustomTransformation.java  |   6 +
 .../cassandra/tcm/transformations/Startup.java     |   6 -
 .../tcm/transformations/TriggerSnapshot.java       |   6 -
 .../tcm/transformations/cms/Initialize.java        |   6 -
 .../distributed/test/PaxosRepairTest.java          |   1 -
 .../distributed/test/log/CMSTestBase.java          |   5 +-
 .../test/log/ClusterMetadataTestHelper.java        |  15 ++-
 .../distributed/test/log/RegisterTest.java         |   5 +-
 .../distributed/test/ring/DecommissionTest.java    |  45 -------
 .../operations/InsertUpdateIfConditionTest.java    |   2 -
 .../SchemaChangeDuringRangeMovementTest.java       |   5 +-
 .../org/apache/cassandra/schema/SchemaTest.java    |  64 ++++++---
 .../apache/cassandra/schema/SchemaTestUtil.java    |  82 +++++++----
 .../apache/cassandra/tcm/ClusterMetadataTest.java  |  86 ++++++++++++
 .../io/sstable/StressCQLSSTableWriter.java         |  34 ++++-
 54 files changed, 724 insertions(+), 264 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index f31d7c39f7..fa2b7a16c6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 5.1
  * Improve isGossipOnlyMember and location lookup performance (CASSANDRA-21039)
+ * Refactor the way we check if a transformation is allowed to be committed 
during upgrades (CASSANDRA-21043)
  * Improve debug around paused and disabled compaction 
(CASSANDRA-20131,CASSANDRA-19728)
  * DiskUsageBroadcaster does not update usageInfo on node replacement 
(CASSANDRA-21033)
  * Reject PrepareJoin if tokens are already assigned (CASSANDRA-21006)
diff --git 
a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java 
b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
index fc1c1c1462..99726f6b3d 100644
--- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
+++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
@@ -579,8 +579,6 @@ public enum CassandraRelevantProperties
     // transactional cluster metadata relevant properties
     // TODO: not a fan of being forced to prefix these to satisfy the 
alphabetic ordering constraint
     //       but it makes sense to group logically related properties together
-
-    
TCM_ALLOW_TRANSFORMATIONS_DURING_UPGRADES("cassandra.allow_transformations_during_upgrades",
 "false"),
     /**
      * for testing purposes disable the automatic CMS reconfiguration after a 
bootstrap/replace/move operation
      */
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java
 
b/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java
index 94f2504653..5ef2b68a9c 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java
@@ -45,6 +45,7 @@ import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.serialization.Version;
 import org.apache.cassandra.transport.Event.SchemaChange;
 import org.apache.cassandra.transport.Event.SchemaChange.Change;
 import org.apache.cassandra.utils.FBUtilities;
@@ -69,6 +70,12 @@ public final class AlterKeyspaceStatement extends 
AlterSchemaStatement
         this.ifExists = ifExists;
     }
 
+    @Override
+    public boolean compatibleWith(ClusterMetadata metadata)
+    {
+        return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V0);
+    }
+
     public Keyspaces apply(ClusterMetadata metadata)
     {
         attrs.validate();
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java
index c7a57139d1..d9a1476a0b 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java
@@ -32,6 +32,7 @@ import javax.annotation.Nullable;
 import com.google.common.base.Splitter;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableSet;
+import org.apache.cassandra.tcm.serialization.Version;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -176,6 +177,12 @@ public abstract class AlterTableStatement extends 
AlterSchemaStatement
         {
             throw ire("Altering column types is no longer supported");
         }
+
+        @Override
+        public boolean compatibleWith(ClusterMetadata metadata)
+        {
+            return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V0);
+        }
     }
 
     /**
@@ -211,6 +218,12 @@ public abstract class AlterTableStatement extends 
AlterSchemaStatement
                 ColumnMask.ensureEnabled();
         }
 
+        @Override
+        public boolean compatibleWith(ClusterMetadata metadata)
+        {
+            return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V0);
+        }
+
         @Override
         public KeyspaceMetadata apply(Epoch epoch, KeyspaceMetadata keyspace, 
TableMetadata table, ClusterMetadata metadata)
         {
@@ -301,6 +314,12 @@ public abstract class AlterTableStatement extends 
AlterSchemaStatement
             newColumns.forEach(c -> c.type.validate(state, "Column " + 
c.name));
         }
 
+        @Override
+        public boolean compatibleWith(ClusterMetadata metadata)
+        {
+            return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V0);
+        }
+
         public KeyspaceMetadata apply(Epoch epoch, KeyspaceMetadata keyspace, 
TableMetadata table, ClusterMetadata metadata)
         {
             Guardrails.alterTableEnabled.ensureEnabled("ALTER TABLE changing 
columns", state);
@@ -461,6 +480,12 @@ public abstract class AlterTableStatement extends 
AlterSchemaStatement
             return 
keyspace.withSwapped(keyspace.tables.withSwapped(builder.build()));
         }
 
+        @Override
+        public boolean compatibleWith(ClusterMetadata metadata)
+        {
+            return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V0);
+        }
+
         private void dropColumn(KeyspaceMetadata keyspace, TableMetadata 
table, ColumnIdentifier column, boolean ifExists, TableMetadata.Builder builder)
         {
             ColumnMetadata currentColumn = table.getColumn(column);
@@ -522,6 +547,12 @@ public abstract class AlterTableStatement extends 
AlterSchemaStatement
             this.ifColumnsExists = ifColumnsExists;
         }
 
+        @Override
+        public boolean compatibleWith(ClusterMetadata metadata)
+        {
+            return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V0);
+        }
+
         public KeyspaceMetadata apply(Epoch epoch, KeyspaceMetadata keyspace, 
TableMetadata table, ClusterMetadata metadata)
         {
             Guardrails.alterTableEnabled.ensureEnabled("ALTER TABLE changing 
columns", state);
@@ -640,6 +671,11 @@ public abstract class AlterTableStatement extends 
AlterSchemaStatement
             return 
next.unbuild().transactionalMigrationFrom(newMigrateFrom).build();
         }
 
+        @Override
+        public boolean compatibleWith(ClusterMetadata metadata)
+        {
+            return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V0);
+        }
 
         public KeyspaceMetadata apply(Epoch epoch, KeyspaceMetadata keyspace, 
TableMetadata table, ClusterMetadata metadata)
         {
@@ -687,6 +723,12 @@ public abstract class AlterTableStatement extends 
AlterSchemaStatement
             super(keyspaceName, tableName, ifTableExists);
         }
 
+        @Override
+        public boolean compatibleWith(ClusterMetadata metadata)
+        {
+            return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V0);
+        }
+
         public KeyspaceMetadata apply(Epoch epoch, KeyspaceMetadata keyspace, 
TableMetadata table, ClusterMetadata metadata)
         {
             if (!DatabaseDescriptor.enableDropCompactStorage())
@@ -816,6 +858,12 @@ public abstract class AlterTableStatement extends 
AlterSchemaStatement
             }
             return keyspace;
         }
+
+        @Override
+        public boolean compatibleWith(ClusterMetadata metadata)
+        {
+            return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V5);
+        }
     }
 
     public static final class Raw extends CQLStatement.Raw
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/AlterTypeStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/schema/AlterTypeStatement.java
index fefe70e1c6..f089605338 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/schema/AlterTypeStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/AlterTypeStatement.java
@@ -35,6 +35,7 @@ import org.apache.cassandra.schema.Keyspaces;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.serialization.Version;
 import org.apache.cassandra.transport.Event.SchemaChange;
 import org.apache.cassandra.transport.Event.SchemaChange.Change;
 import org.apache.cassandra.transport.Event.SchemaChange.Target;
@@ -128,6 +129,12 @@ public abstract class AlterTypeStatement extends 
AlterSchemaStatement
             this.state = state;
         }
 
+        @Override
+        public boolean compatibleWith(ClusterMetadata metadata)
+        {
+            return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V0);
+        }
+
         UserType apply(KeyspaceMetadata keyspace, UserType userType)
         {
             if (type.isCounter())
@@ -186,6 +193,12 @@ public abstract class AlterTypeStatement extends 
AlterSchemaStatement
             this.renamedFields = renamedFields;
         }
 
+        @Override
+        public boolean compatibleWith(ClusterMetadata metadata)
+        {
+            return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V0);
+        }
+
         UserType apply(KeyspaceMetadata keyspace, UserType userType)
         {
             List<String> dependentAggregates =
@@ -233,6 +246,12 @@ public abstract class AlterTypeStatement extends 
AlterSchemaStatement
             super(keyspaceName, typeName, ifExists);
         }
 
+        @Override
+        public boolean compatibleWith(ClusterMetadata metadata)
+        {
+            return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V0);
+        }
+
         UserType apply(KeyspaceMetadata keyspace, UserType userType)
         {
             throw ire("Altering field types is no longer supported");
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/AlterViewStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/schema/AlterViewStatement.java
index 831e46b0ca..6c13f78f5a 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/schema/AlterViewStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/AlterViewStatement.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.schema.*;
 import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.serialization.Version;
 import org.apache.cassandra.transport.Event.SchemaChange;
 import org.apache.cassandra.transport.Event.SchemaChange.Change;
 import org.apache.cassandra.transport.Event.SchemaChange.Target;
@@ -55,6 +56,12 @@ public final class AlterViewStatement extends 
AlterSchemaStatement
         this.state = state;
     }
 
+    @Override
+    public boolean compatibleWith(ClusterMetadata metadata)
+    {
+        return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V0);
+    }
+
     @Override
     public Keyspaces apply(ClusterMetadata metadata)
     {
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/CommentOnColumnStatement.java
 
b/src/java/org/apache/cassandra/cql3/statements/schema/CommentOnColumnStatement.java
index 0bcb6088c8..329ee26e8f 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/schema/CommentOnColumnStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/CommentOnColumnStatement.java
@@ -30,6 +30,7 @@ import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.serialization.Version;
 import org.apache.cassandra.transport.Event.SchemaChange;
 import org.apache.cassandra.transport.Event.SchemaChange.Change;
 import org.apache.cassandra.transport.Event.SchemaChange.Target;
@@ -63,6 +64,13 @@ public final class CommentOnColumnStatement extends 
SchemaDescriptionStatement
         this.columnName = columnName;
     }
 
+    @Override
+    public boolean compatibleWith(ClusterMetadata metadata)
+    {
+        return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V8);
+    }
+
+
     @Override
     public Keyspaces apply(ClusterMetadata metadata)
     {
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/CommentOnKeyspaceStatement.java
 
b/src/java/org/apache/cassandra/cql3/statements/schema/CommentOnKeyspaceStatement.java
index 5def5af758..2d9766ea08 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/schema/CommentOnKeyspaceStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/CommentOnKeyspaceStatement.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.schema.Keyspaces;
 import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.serialization.Version;
 import org.apache.cassandra.transport.Event.SchemaChange;
 import org.apache.cassandra.transport.Event.SchemaChange.Change;
 
@@ -54,6 +55,12 @@ public final class CommentOnKeyspaceStatement extends 
SchemaDescriptionStatement
         super(keyspaceName, comment, DescriptionType.COMMENT);
     }
 
+    @Override
+    public boolean compatibleWith(ClusterMetadata metadata)
+    {
+        return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V8);
+    }
+
     @Override
     public Keyspaces apply(ClusterMetadata metadata)
     {
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/CommentOnTableStatement.java
 
b/src/java/org/apache/cassandra/cql3/statements/schema/CommentOnTableStatement.java
index 0c2ced7daf..3118533218 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/schema/CommentOnTableStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/CommentOnTableStatement.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.TableParams;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.serialization.Version;
 import org.apache.cassandra.transport.Event.SchemaChange;
 import org.apache.cassandra.transport.Event.SchemaChange.Change;
 import org.apache.cassandra.transport.Event.SchemaChange.Target;
@@ -60,6 +61,12 @@ public final class CommentOnTableStatement extends 
SchemaDescriptionStatement
         this.tableName = tableName;
     }
 
+    @Override
+    public boolean compatibleWith(ClusterMetadata metadata)
+    {
+        return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V8);
+    }
+
     @Override
     public Keyspaces apply(ClusterMetadata metadata)
     {
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/CommentOnUserTypeFieldStatement.java
 
b/src/java/org/apache/cassandra/cql3/statements/schema/CommentOnUserTypeFieldStatement.java
index 049b2de7e1..d77c19d94e 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/schema/CommentOnUserTypeFieldStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/CommentOnUserTypeFieldStatement.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.schema.Keyspaces;
 import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.serialization.Version;
 import org.apache.cassandra.transport.Event.SchemaChange;
 import org.apache.cassandra.transport.Event.SchemaChange.Change;
 import org.apache.cassandra.transport.Event.SchemaChange.Target;
@@ -61,6 +62,12 @@ public final class CommentOnUserTypeFieldStatement extends 
SchemaDescriptionStat
         this.fieldName = fieldName;
     }
 
+    @Override
+    public boolean compatibleWith(ClusterMetadata metadata)
+    {
+        return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V8);
+    }
+
     @Override
     public Keyspaces apply(ClusterMetadata metadata)
     {
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/CommentOnUserTypeStatement.java
 
b/src/java/org/apache/cassandra/cql3/statements/schema/CommentOnUserTypeStatement.java
index c12d9918cc..9be3da5fec 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/schema/CommentOnUserTypeStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/CommentOnUserTypeStatement.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.schema.Keyspaces;
 import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.serialization.Version;
 import org.apache.cassandra.transport.Event.SchemaChange;
 import org.apache.cassandra.transport.Event.SchemaChange.Change;
 import org.apache.cassandra.transport.Event.SchemaChange.Target;
@@ -59,6 +60,12 @@ public final class CommentOnUserTypeStatement extends 
SchemaDescriptionStatement
         this.typeName = typeName;
     }
 
+    @Override
+    public boolean compatibleWith(ClusterMetadata metadata)
+    {
+        return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V8);
+    }
+
     @Override
     public Keyspaces apply(ClusterMetadata metadata)
     {
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/CopyTableStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/schema/CopyTableStatement.java
index bde06c35dc..9651c92a49 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/schema/CopyTableStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/CopyTableStatement.java
@@ -57,6 +57,7 @@ import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.service.reads.repair.ReadRepairStrategy;
 import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.serialization.Version;
 import org.apache.cassandra.transport.Event.SchemaChange;
 
 /**
@@ -111,6 +112,12 @@ public final class CopyTableStatement extends 
AlterSchemaStatement
         return new AuditLogContext(AuditLogEntryType.CREATE_TABLE_LIKE, 
targetKeyspace, targetTableName);
     }
 
+    @Override
+    public boolean compatibleWith(ClusterMetadata metadata)
+    {
+        return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V5);
+    }
+
     @Override
     public Keyspaces apply(ClusterMetadata metadata)
     {
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/CreateAggregateStatement.java
 
b/src/java/org/apache/cassandra/cql3/statements/schema/CreateAggregateStatement.java
index 6428d85073..ff5fa8c02a 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/schema/CreateAggregateStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/CreateAggregateStatement.java
@@ -47,6 +47,7 @@ import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.serialization.Version;
 import org.apache.cassandra.transport.Event.SchemaChange;
 import org.apache.cassandra.transport.Event.SchemaChange.Change;
 import org.apache.cassandra.transport.Event.SchemaChange.Target;
@@ -92,6 +93,12 @@ public final class CreateAggregateStatement extends 
AlterSchemaStatement
         this.ifNotExists = ifNotExists;
     }
 
+    @Override
+    public boolean compatibleWith(ClusterMetadata metadata)
+    {
+        return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V0);
+    }
+
     @Override
     public Keyspaces apply(ClusterMetadata metadata)
     {
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/CreateFunctionStatement.java
 
b/src/java/org/apache/cassandra/cql3/statements/schema/CreateFunctionStatement.java
index 20923687ff..7958d8e3e0 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/schema/CreateFunctionStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/CreateFunctionStatement.java
@@ -41,6 +41,7 @@ import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.serialization.Version;
 import org.apache.cassandra.transport.Event.SchemaChange;
 import org.apache.cassandra.transport.Event.SchemaChange.Change;
 import org.apache.cassandra.transport.Event.SchemaChange.Target;
@@ -82,6 +83,12 @@ public final class CreateFunctionStatement extends 
AlterSchemaStatement
         this.ifNotExists = ifNotExists;
     }
 
+    @Override
+    public boolean compatibleWith(ClusterMetadata metadata)
+    {
+        return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V0);
+    }
+
     // TODO: replace affected aggregates !!
     public Keyspaces apply(ClusterMetadata metadata)
     {
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java
 
b/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java
index addf9b68ef..f36ec54262 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java
@@ -41,6 +41,7 @@ import org.apache.cassandra.schema.*;
 import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.serialization.Version;
 import org.apache.cassandra.transport.Event.SchemaChange;
 import org.apache.cassandra.transport.Event.SchemaChange.Change;
 import org.apache.cassandra.transport.Event.SchemaChange.Target;
@@ -122,6 +123,12 @@ public final class CreateIndexStatement extends 
AlterSchemaStatement
         this.state = state;
     }
 
+    @Override
+    public boolean compatibleWith(ClusterMetadata metadata)
+    {
+        return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V0);
+    }
+
     @Override
     public Keyspaces apply(ClusterMetadata metadata)
     {
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/CreateKeyspaceStatement.java
 
b/src/java/org/apache/cassandra/cql3/statements/schema/CreateKeyspaceStatement.java
index 1df860d123..a035da52da 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/schema/CreateKeyspaceStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/CreateKeyspaceStatement.java
@@ -37,6 +37,7 @@ import org.apache.cassandra.schema.KeyspaceParams.Option;
 import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.serialization.Version;
 import org.apache.cassandra.transport.Event.SchemaChange;
 import org.apache.cassandra.transport.Event.SchemaChange.Change;
 
@@ -61,6 +62,12 @@ public final class CreateKeyspaceStatement extends 
AlterSchemaStatement
         return super.cql();
     }
 
+    @Override
+    public boolean compatibleWith(ClusterMetadata metadata)
+    {
+        return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V0);
+    }
+
     @Override
     public Keyspaces apply(ClusterMetadata metadata)
     {
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/CreateTableStatement.java
 
b/src/java/org/apache/cassandra/cql3/statements/schema/CreateTableStatement.java
index e452d36e11..328cb7542e 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/schema/CreateTableStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/CreateTableStatement.java
@@ -73,6 +73,7 @@ import org.apache.cassandra.schema.UserFunctions;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.reads.repair.ReadRepairStrategy;
 import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.serialization.Version;
 import org.apache.cassandra.transport.Event.SchemaChange;
 import org.apache.cassandra.transport.Event.SchemaChange.Change;
 import org.apache.cassandra.transport.Event.SchemaChange.Target;
@@ -135,6 +136,12 @@ public final class CreateTableStatement extends 
AlterSchemaStatement
         return super.cql();
     }
 
+    @Override
+    public boolean compatibleWith(ClusterMetadata metadata)
+    {
+        return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V0);
+    }
+
     public Keyspaces apply(ClusterMetadata metadata)
     {
         Keyspaces schema = metadata.schema.getKeyspaces();
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/CreateTriggerStatement.java
 
b/src/java/org/apache/cassandra/cql3/statements/schema/CreateTriggerStatement.java
index 5441cb8952..ccbd4852eb 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/schema/CreateTriggerStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/CreateTriggerStatement.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.schema.*;
 import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.serialization.Version;
 import org.apache.cassandra.triggers.TriggerExecutor;
 import org.apache.cassandra.transport.Event.SchemaChange;
 import org.apache.cassandra.transport.Event.SchemaChange.Change;
@@ -66,6 +67,12 @@ public final class CreateTriggerStatement extends 
AlterSchemaStatement
         }
     }
 
+    @Override
+    public boolean compatibleWith(ClusterMetadata metadata)
+    {
+        return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V0);
+    }
+
     @Override
     public Keyspaces apply(ClusterMetadata metadata)
     {
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/CreateTypeStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/schema/CreateTypeStatement.java
index bd63310f18..375e7e832e 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/schema/CreateTypeStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/CreateTypeStatement.java
@@ -35,6 +35,7 @@ import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
 import org.apache.cassandra.schema.Types;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.serialization.Version;
 import org.apache.cassandra.transport.Event.SchemaChange;
 import org.apache.cassandra.transport.Event.SchemaChange.Change;
 import org.apache.cassandra.transport.Event.SchemaChange.Target;
@@ -76,6 +77,12 @@ public final class CreateTypeStatement extends 
AlterSchemaStatement
         }
     }
 
+    @Override
+    public boolean compatibleWith(ClusterMetadata metadata)
+    {
+        return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V0);
+    }
+
     public Keyspaces apply(ClusterMetadata metadata)
     {
         Keyspaces schema = metadata.schema.getKeyspaces();
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/CreateViewStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/schema/CreateViewStatement.java
index 4a4d903a33..3b204b236c 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/schema/CreateViewStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/CreateViewStatement.java
@@ -58,6 +58,7 @@ import org.apache.cassandra.schema.TableParams;
 import org.apache.cassandra.schema.ViewMetadata;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.serialization.Version;
 import org.apache.cassandra.transport.Event.SchemaChange;
 import org.apache.cassandra.transport.Event.SchemaChange.Change;
 import org.apache.cassandra.transport.Event.SchemaChange.Target;
@@ -129,6 +130,12 @@ public final class CreateViewStatement extends 
AlterSchemaStatement
         this.state = state;
     }
 
+    @Override
+    public boolean compatibleWith(ClusterMetadata metadata)
+    {
+        return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V0);
+    }
+
     @Override
     public Keyspaces apply(ClusterMetadata metadata)
     {
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/DropAggregateStatement.java
 
b/src/java/org/apache/cassandra/cql3/statements/schema/DropAggregateStatement.java
index e61332ee12..16beede039 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/schema/DropAggregateStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/DropAggregateStatement.java
@@ -36,6 +36,7 @@ import org.apache.cassandra.schema.*;
 import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.serialization.Version;
 import org.apache.cassandra.transport.Event.SchemaChange;
 import org.apache.cassandra.transport.Event.SchemaChange.Change;
 
@@ -65,6 +66,12 @@ public final class DropAggregateStatement extends 
AlterSchemaStatement
         this.ifExists = ifExists;
     }
 
+    @Override
+    public boolean compatibleWith(ClusterMetadata metadata)
+    {
+        return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V0);
+    }
+
     public Keyspaces apply(ClusterMetadata metadata)
     {
         String name =
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/DropFunctionStatement.java
 
b/src/java/org/apache/cassandra/cql3/statements/schema/DropFunctionStatement.java
index 0353964683..c157e42107 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/schema/DropFunctionStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/DropFunctionStatement.java
@@ -36,6 +36,7 @@ import org.apache.cassandra.schema.*;
 import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.serialization.Version;
 import org.apache.cassandra.transport.Event.SchemaChange;
 import org.apache.cassandra.transport.Event.SchemaChange.Change;
 
@@ -66,6 +67,12 @@ public final class DropFunctionStatement extends 
AlterSchemaStatement
         this.ifExists = ifExists;
     }
 
+    @Override
+    public boolean compatibleWith(ClusterMetadata metadata)
+    {
+        return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V0);
+    }
+
     @Override
     public Keyspaces apply(ClusterMetadata metadata)
     {
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/DropIndexStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/schema/DropIndexStatement.java
index 06cbc4be99..0fde7c8fc9 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/schema/DropIndexStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/DropIndexStatement.java
@@ -27,6 +27,7 @@ import 
org.apache.cassandra.schema.KeyspaceMetadata.KeyspaceDiff;
 import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.serialization.Version;
 import org.apache.cassandra.transport.Event.SchemaChange;
 import org.apache.cassandra.transport.Event.SchemaChange.Change;
 import org.apache.cassandra.transport.Event.SchemaChange.Target;
@@ -43,6 +44,12 @@ public final class DropIndexStatement extends 
AlterSchemaStatement
         this.ifExists = ifExists;
     }
 
+    @Override
+    public boolean compatibleWith(ClusterMetadata metadata)
+    {
+        return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V0);
+    }
+
     @Override
     public Keyspaces apply(ClusterMetadata metadata)
     {
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/DropKeyspaceStatement.java
 
b/src/java/org/apache/cassandra/cql3/statements/schema/DropKeyspaceStatement.java
index 0ab0447f6b..744c768010 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/schema/DropKeyspaceStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/DropKeyspaceStatement.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.serialization.Version;
 import org.apache.cassandra.transport.Event.SchemaChange;
 import org.apache.cassandra.transport.Event.SchemaChange.Change;
 
@@ -44,6 +45,12 @@ public final class DropKeyspaceStatement extends 
AlterSchemaStatement
         this.ifExists = ifExists;
     }
 
+    @Override
+    public boolean compatibleWith(ClusterMetadata metadata)
+    {
+        return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V0);
+    }
+
     @Override
     public Keyspaces apply(ClusterMetadata metadata)
     {
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/DropTableStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/schema/DropTableStatement.java
index c9e6516c57..79b416ad26 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/schema/DropTableStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/DropTableStatement.java
@@ -30,6 +30,7 @@ import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.ClusterMetadataService;
 import org.apache.cassandra.tcm.sequences.DropAccordTable.TableReference;
 import org.apache.cassandra.tcm.sequences.InProgressSequences;
+import org.apache.cassandra.tcm.serialization.Version;
 import org.apache.cassandra.tcm.transformations.PrepareDropAccordTable;
 import org.apache.cassandra.transport.Event.SchemaChange;
 import org.apache.cassandra.transport.Event.SchemaChange.Change;
@@ -72,6 +73,11 @@ public final class DropTableStatement extends 
AlterSchemaStatement
         return InProgressSequences.finishInProgressSequences(ref);
     }
 
+    public boolean compatibleWith(ClusterMetadata metadata)
+    {
+        return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V0);
+    }
+
     public Keyspaces apply(ClusterMetadata metadata)
     {
         Guardrails.dropTruncateTableEnabled.ensureEnabled(state);
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/DropTriggerStatement.java
 
b/src/java/org/apache/cassandra/cql3/statements/schema/DropTriggerStatement.java
index 15007c3b1e..77d003b9e5 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/schema/DropTriggerStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/DropTriggerStatement.java
@@ -25,6 +25,7 @@ import org.apache.cassandra.schema.*;
 import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.serialization.Version;
 import org.apache.cassandra.transport.Event.SchemaChange;
 import org.apache.cassandra.transport.Event.SchemaChange.Change;
 import org.apache.cassandra.transport.Event.SchemaChange.Target;
@@ -43,6 +44,12 @@ public final class DropTriggerStatement extends 
AlterSchemaStatement
         this.ifExists = ifExists;
     }
 
+    @Override
+    public boolean compatibleWith(ClusterMetadata metadata)
+    {
+        return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V0);
+    }
+
     @Override
     public Keyspaces apply(ClusterMetadata metadata)
     {
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/DropTypeStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/schema/DropTypeStatement.java
index 105c8f5db8..ba904e4370 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/schema/DropTypeStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/DropTypeStatement.java
@@ -32,6 +32,7 @@ import org.apache.cassandra.schema.Keyspaces;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.serialization.Version;
 import org.apache.cassandra.transport.Event.SchemaChange.Change;
 import org.apache.cassandra.transport.Event.SchemaChange.Target;
 import org.apache.cassandra.transport.Event.SchemaChange;
@@ -55,6 +56,12 @@ public final class DropTypeStatement extends 
AlterSchemaStatement
         this.ifExists = ifExists;
     }
 
+    @Override
+    public boolean compatibleWith(ClusterMetadata metadata)
+    {
+        return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V0);
+    }
+
     // TODO: expand types into tuples in all dropped columns of all tables
     @Override
     public Keyspaces apply(ClusterMetadata metadata)
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/DropViewStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/schema/DropViewStatement.java
index 121575503c..0c07dc7907 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/schema/DropViewStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/DropViewStatement.java
@@ -26,6 +26,7 @@ import org.apache.cassandra.schema.*;
 import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.serialization.Version;
 import org.apache.cassandra.transport.Event.SchemaChange;
 import org.apache.cassandra.transport.Event.SchemaChange.Change;
 import org.apache.cassandra.transport.Event.SchemaChange.Target;
@@ -42,6 +43,12 @@ public final class DropViewStatement extends 
AlterSchemaStatement
         this.ifExists = ifExists;
     }
 
+    @Override
+    public boolean compatibleWith(ClusterMetadata metadata)
+    {
+        return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V0);
+    }
+
     @Override
     public Keyspaces apply(ClusterMetadata metadata)
     {
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/SecurityLabelOnColumnStatement.java
 
b/src/java/org/apache/cassandra/cql3/statements/schema/SecurityLabelOnColumnStatement.java
index 22cb81e6e9..9724f336a8 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/schema/SecurityLabelOnColumnStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/SecurityLabelOnColumnStatement.java
@@ -30,6 +30,7 @@ import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.serialization.Version;
 import org.apache.cassandra.transport.Event.SchemaChange;
 import org.apache.cassandra.transport.Event.SchemaChange.Change;
 import org.apache.cassandra.transport.Event.SchemaChange.Target;
@@ -88,6 +89,11 @@ public final class SecurityLabelOnColumnStatement extends 
SchemaDescriptionState
         this.provider = provider;
     }
 
+    @Override
+    public boolean compatibleWith(ClusterMetadata metadata)
+    {
+        return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V8);
+    }
 
     @Override
     public Keyspaces apply(ClusterMetadata metadata)
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/SecurityLabelOnKeyspaceStatement.java
 
b/src/java/org/apache/cassandra/cql3/statements/schema/SecurityLabelOnKeyspaceStatement.java
index e9ab8625d1..1fa2051bbc 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/schema/SecurityLabelOnKeyspaceStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/SecurityLabelOnKeyspaceStatement.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.schema.Keyspaces;
 import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.serialization.Version;
 import org.apache.cassandra.transport.Event.SchemaChange;
 import org.apache.cassandra.transport.Event.SchemaChange.Change;
 
@@ -80,6 +81,12 @@ public final class SecurityLabelOnKeyspaceStatement extends 
SchemaDescriptionSta
         this.provider = provider;
     }
 
+    @Override
+    public boolean compatibleWith(ClusterMetadata metadata)
+    {
+        return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V8);
+    }
+
     @Override
     public Keyspaces apply(ClusterMetadata metadata)
     {
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/SecurityLabelOnTableStatement.java
 
b/src/java/org/apache/cassandra/cql3/statements/schema/SecurityLabelOnTableStatement.java
index 969014c8ab..9640640be2 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/schema/SecurityLabelOnTableStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/SecurityLabelOnTableStatement.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.TableParams;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.serialization.Version;
 import org.apache.cassandra.transport.Event.SchemaChange;
 import org.apache.cassandra.transport.Event.SchemaChange.Change;
 import org.apache.cassandra.transport.Event.SchemaChange.Target;
@@ -85,6 +86,12 @@ public final class SecurityLabelOnTableStatement extends 
SchemaDescriptionStatem
         this.provider = provider;
     }
 
+    @Override
+    public boolean compatibleWith(ClusterMetadata metadata)
+    {
+        return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V8);
+    }
+
     @Override
     public Keyspaces apply(ClusterMetadata metadata)
     {
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/SecurityLabelOnUserTypeFieldStatement.java
 
b/src/java/org/apache/cassandra/cql3/statements/schema/SecurityLabelOnUserTypeFieldStatement.java
index fee6fa6b2f..e4aca100d5 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/schema/SecurityLabelOnUserTypeFieldStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/SecurityLabelOnUserTypeFieldStatement.java
@@ -30,6 +30,7 @@ import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.serialization.Version;
 import org.apache.cassandra.transport.Event.SchemaChange;
 import org.apache.cassandra.transport.Event.SchemaChange.Change;
 import org.apache.cassandra.transport.Event.SchemaChange.Target;
@@ -87,6 +88,12 @@ public final class SecurityLabelOnUserTypeFieldStatement 
extends SchemaDescripti
         this.provider = provider;
     }
 
+    @Override
+    public boolean compatibleWith(ClusterMetadata metadata)
+    {
+        return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V8);
+    }
+
     @Override
     public Keyspaces apply(ClusterMetadata metadata)
     {
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/SecurityLabelOnUserTypeStatement.java
 
b/src/java/org/apache/cassandra/cql3/statements/schema/SecurityLabelOnUserTypeStatement.java
index cd30e456a3..0a4d3ee4e9 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/schema/SecurityLabelOnUserTypeStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/SecurityLabelOnUserTypeStatement.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.schema.Keyspaces;
 import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.serialization.Version;
 import org.apache.cassandra.transport.Event.SchemaChange;
 import org.apache.cassandra.transport.Event.SchemaChange.Change;
 import org.apache.cassandra.transport.Event.SchemaChange.Target;
@@ -84,6 +85,12 @@ public final class SecurityLabelOnUserTypeStatement extends 
SchemaDescriptionSta
         this.provider = provider;
     }
 
+    @Override
+    public boolean compatibleWith(ClusterMetadata metadata)
+    {
+        return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V8);
+    }
+
     @Override
     public Keyspaces apply(ClusterMetadata metadata)
     {
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index c1043a9fb7..8236451eb8 100644
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -84,6 +84,7 @@ import org.apache.cassandra.schema.Views;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.serialization.Version;
 import org.apache.cassandra.tcm.transformations.AlterSchema;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -857,7 +858,21 @@ public class CQLSSTableWriter implements Closeable
 
         private void commitKeyspaceMetadata(KeyspaceMetadata keyspaceMetadata)
         {
-            SchemaTransformation schemaTransformation = metadata -> 
metadata.schema.getKeyspaces().withAddedOrUpdated(keyspaceMetadata);
+            SchemaTransformation schemaTransformation = new 
SchemaTransformation()
+            {
+                @Override
+                public Keyspaces apply(ClusterMetadata metadata)
+                {
+                    return 
metadata.schema.getKeyspaces().withAddedOrUpdated(keyspaceMetadata);
+                }
+
+                @Override
+                public boolean compatibleWith(ClusterMetadata metadata)
+                {
+                    return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V0);
+                }
+            };
+
             ClusterMetadataService.instance().commit(new 
AlterSchema(schemaTransformation));
         }
 
diff --git a/src/java/org/apache/cassandra/schema/SchemaTransformation.java 
b/src/java/org/apache/cassandra/schema/SchemaTransformation.java
index 75ab3767ab..38b66434bb 100644
--- a/src/java/org/apache/cassandra/schema/SchemaTransformation.java
+++ b/src/java/org/apache/cassandra/schema/SchemaTransformation.java
@@ -46,6 +46,33 @@ public interface SchemaTransformation
      */
     Keyspaces apply(ClusterMetadata metadata);
 
+    /**
+     * Should return true if the implementing schema transformation is 
compatible with the given cluster metadata.
+     * Specifically, it should use the Directory to ensure that all current 
members are capable of both deserializing
+     * and enacting the transformation. If this returns false, the {@link 
org.apache.cassandra.tcm.log.Entry} containing
+     * the schema transformation will be blocked from being committed to the 
metadata log.
+     *
+     * For example, if an entirely new schema transformation is introduced, a 
new metadata serialization version
+     * (see {@link Version}) must be added and the implementation of {@code 
compatibleWith} in that schema
+     * transformation must return true only if the {@code 
commonSerializationVersion} of
+     * {@link ClusterMetadata#directory} is at least equal to the new {@code 
Version}.
+     *
+     * Similar constraints apply if a new feature is added to an existing 
transformation. Depending on the specifics of
+     * the feature implementation, a new serialization {@link Version} may not 
be required but the transformation should
+     * be able to signal that a minimum Cassandra version is required before 
the transformation can be successfully
+     * performed.
+     * An illustraton of this scenario could be adding a new compression 
implementation in a minor release. No
+     * serialization changes are required as compression params are 
represented by a simple {@code Map<String,String>}
+     * but if a table is created or altered to use a new compression class 
during the minor upgrade, the unupgraded
+     * nodes may not be able to enact the transformation, requiring them to 
complete the upgrade synchronously. To
+     * mitigate this, the {@code compatibleWith} implementation in
+     * {@link 
org.apache.cassandra.cql3.statements.schema.CreateTableStatement} and
+     * {@link org.apache.cassandra.cql3.statements.schema.AlterTableStatement} 
would inspect the
+     * {@link org.apache.cassandra.utils.CassandraVersion} of {@code 
clusterMinVersion} provided by
+     * {@link ClusterMetadata#directory} to ensure all members are 
sufficiently capable.
+     */
+    boolean compatibleWith(ClusterMetadata metadata);
+
     default String cql()
     {
         return "null";
diff --git a/src/java/org/apache/cassandra/schema/SchemaTransformations.java 
b/src/java/org/apache/cassandra/schema/SchemaTransformations.java
index c6996199e5..f0b9fe6105 100644
--- a/src/java/org/apache/cassandra/schema/SchemaTransformations.java
+++ b/src/java/org/apache/cassandra/schema/SchemaTransformations.java
@@ -23,10 +23,8 @@ import java.util.Optional;
 import org.apache.cassandra.db.marshal.UserType;
 import org.apache.cassandra.exceptions.AlreadyExistsException;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.cql3.CQLStatement;
-import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.serialization.Version;
 
 import static 
org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest;
 
@@ -35,14 +33,6 @@ import static 
org.apache.cassandra.cql3.statements.RequestValidations.invalidReq
  */
 public class SchemaTransformations
 {
-    public static SchemaTransformation fromCql(String cql)
-    {
-        CQLStatement statement = QueryProcessor.getStatement(cql, 
ClientState.forInternalCalls());
-        if (!(statement instanceof SchemaTransformation))
-            throw new IllegalArgumentException("Can not deserialize schema 
transformation");
-        return (SchemaTransformation) statement;
-    }
-
     /**
      * Creates a schema transformation that adds the provided keyspace.
      *
@@ -54,19 +44,29 @@ public class SchemaTransformations
      */
     public static SchemaTransformation addKeyspace(KeyspaceMetadata keyspace, 
boolean ignoreIfExists)
     {
-        return (metadata) ->
+        return new SchemaTransformation()
         {
-            Keyspaces schema = metadata.schema.getKeyspaces();
-            KeyspaceMetadata existing = schema.getNullable(keyspace.name);
-            if (existing != null)
+            @Override
+            public Keyspaces apply(ClusterMetadata metadata)
             {
-                if (ignoreIfExists)
-                    return schema;
+                Keyspaces schema = metadata.schema.getKeyspaces();
+                KeyspaceMetadata existing = schema.getNullable(keyspace.name);
+                if (existing != null)
+                {
+                    if (ignoreIfExists)
+                        return schema;
+
+                    throw new AlreadyExistsException(keyspace.name);
+                }
 
-                throw new AlreadyExistsException(keyspace.name);
+                return schema.withAddedOrUpdated(keyspace);
             }
 
-            return schema.withAddedOrUpdated(keyspace);
+            @Override
+            public boolean compatibleWith(ClusterMetadata metadata)
+            {
+                return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V0);
+            }
         };
     }
 
@@ -81,84 +81,74 @@ public class SchemaTransformations
      */
     public static SchemaTransformation addTable(TableMetadata table, boolean 
ignoreIfExists)
     {
-        return (metadata) ->
+        return new SchemaTransformation()
         {
-            Keyspaces schema = metadata.schema.getKeyspaces();
-            KeyspaceMetadata keyspace = schema.getNullable(table.keyspace);
-            if (keyspace == null)
-                throw invalidRequest("Keyspace '%s' doesn't exist", 
table.keyspace);
-
-            if (keyspace.hasTable(table.name))
+            @Override
+            public Keyspaces apply(ClusterMetadata metadata)
             {
-                if (ignoreIfExists)
-                    return schema;
+                Keyspaces schema = metadata.schema.getKeyspaces();
+                KeyspaceMetadata keyspace = schema.getNullable(table.keyspace);
+                if (keyspace == null)
+                    throw invalidRequest("Keyspace '%s' doesn't exist", 
table.keyspace);
 
-                throw new AlreadyExistsException(table.keyspace, table.name);
-            }
+                if (keyspace.hasTable(table.name))
+                {
+                    if (ignoreIfExists)
+                        return schema;
 
-            table.validate();
+                    throw new AlreadyExistsException(table.keyspace, 
table.name);
+                }
+
+                table.validate();
+
+                return 
schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.tables.with(table)));
+            }
 
-            return 
schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.tables.with(table)));
+            @Override
+            public boolean compatibleWith(ClusterMetadata metadata)
+            {
+                return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V0);
+            }
         };
     }
 
     public static SchemaTransformation addTypes(Types toAdd, boolean 
ignoreIfExists)
     {
-        return (metadata) ->
+        return new SchemaTransformation()
         {
-            Keyspaces schema = metadata.schema.getKeyspaces();
-            if (toAdd.isEmpty())
-                return schema;
+            @Override
+            public Keyspaces apply(ClusterMetadata metadata)
+            {
+                Keyspaces schema = metadata.schema.getKeyspaces();
+                if (toAdd.isEmpty())
+                    return schema;
 
-            String keyspaceName = toAdd.iterator().next().keyspace;
-            KeyspaceMetadata keyspace = schema.getNullable(keyspaceName);
-            if (null == keyspace)
-                throw invalidRequest("Keyspace '%s' doesn't exist", 
keyspaceName);
+                String keyspaceName = toAdd.iterator().next().keyspace;
+                KeyspaceMetadata keyspace = schema.getNullable(keyspaceName);
+                if (null == keyspace)
+                    throw invalidRequest("Keyspace '%s' doesn't exist", 
keyspaceName);
 
-            Types types = keyspace.types;
-            for (UserType type : toAdd)
-            {
-                if (types.containsType(type.name))
+                Types types = keyspace.types;
+                for (UserType type : toAdd)
                 {
-                    if (ignoreIfExists)
-                        continue;
+                    if (types.containsType(type.name))
+                    {
+                        if (ignoreIfExists)
+                            continue;
 
-                    throw new ConfigurationException("Type " + type + " 
already exists in " + keyspaceName);
-                }
+                        throw new ConfigurationException("Type " + type + " 
already exists in " + keyspaceName);
+                    }
 
-                types = types.with(type);
+                    types = types.with(type);
+                }
+                return schema.withAddedOrReplaced(keyspace.withSwapped(types));
             }
-            return schema.withAddedOrReplaced(keyspace.withSwapped(types));
-        };
-    }
-
-    /**
-     * Creates a schema transformation that adds the provided view.
-     *
-     * @param view           the view to add.
-     * @param ignoreIfExists if {@code true}, the transformation is a no-op if 
a view of the same name than
-     *                       {@code view} already exists in the schema the 
transformation is applied on. Otherwise,
-     *                       the transformation throws an {@link 
AlreadyExistsException} in that case.
-     * @return the created transformation.
-     */
-    public static SchemaTransformation addView(ViewMetadata view, boolean 
ignoreIfExists)
-    {
-        return (metadata) ->
-        {
-            Keyspaces schema = metadata.schema.getKeyspaces();
-            KeyspaceMetadata keyspace = schema.getNullable(view.keyspace());
-            if (keyspace == null)
-                throw invalidRequest("Cannot add view to non existing keyspace 
'%s'", view.keyspace());
 
-            if (keyspace.hasView(view.name()))
+            @Override
+            public boolean compatibleWith(ClusterMetadata metadata)
             {
-                if (ignoreIfExists)
-                    return schema;
-
-                throw new AlreadyExistsException(view.keyspace(), view.name());
+                return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V0);
             }
-
-            return 
schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.views.with(view)));
         };
     }
 
@@ -184,6 +174,12 @@ public class SchemaTransformations
                 return Optional.of(generation);
             }
 
+            @Override
+            public boolean compatibleWith(ClusterMetadata metadata)
+            {
+                return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V0);
+            }
+
             @Override
             public Keyspaces apply(ClusterMetadata metadata)
             {
diff --git a/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java 
b/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java
index 72d46a5af5..0f42bae462 100644
--- a/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java
+++ b/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java
@@ -23,7 +23,6 @@ import java.util.function.Supplier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.tcm.log.Entry;
 import org.apache.cassandra.tcm.log.LocalLog;
 import org.apache.cassandra.tcm.log.LogState;
@@ -65,11 +64,11 @@ public abstract class AbstractLocalProcessor implements 
Processor
             }
 
             Transformation.Result result;
-            if 
(!CassandraRelevantProperties.TCM_ALLOW_TRANSFORMATIONS_DURING_UPGRADES.getBoolean()
 &&
-                !transform.allowDuringUpgrades() &&
-                previous.metadataSerializationUpgradeInProgress())
+            if (!transform.eligibleToCommit(previous))
             {
-                result = new Transformation.Rejected(INVALID, "Upgrade in 
progress, can't commit " + transform);
+                result = new Transformation.Rejected(INVALID, "Transformation 
rejected, can't commit " + transform +
+                                                              " it not 
supported with cluster common serialization version " + 
previous.directory.commonSerializationVersion +
+                                                              " and min/max 
serialization versions " + previous.directory.clusterMinVersion + "/" + 
previous.directory.clusterMaxVersion);
             }
             else
             {
diff --git a/src/java/org/apache/cassandra/tcm/Transformation.java 
b/src/java/org/apache/cassandra/tcm/Transformation.java
index fc519e6edb..200d19e990 100644
--- a/src/java/org/apache/cassandra/tcm/Transformation.java
+++ b/src/java/org/apache/cassandra/tcm/Transformation.java
@@ -69,6 +69,8 @@ import 
org.apache.cassandra.tcm.transformations.cms.RemoveFromCMS;
 import org.apache.cassandra.tcm.transformations.cms.StartAddToCMS;
 import org.apache.cassandra.tcm.transformations.cms.PrepareCMSReconfiguration;
 
+import static org.apache.cassandra.tcm.serialization.Version.V0;
+
 /**
  * Implementations should be pure transformations from one ClusterMetadata 
state to another. They are likely to be
  * replayed during startup to rebuild the node's current state and so should 
be free of side effects and should not
@@ -91,9 +93,28 @@ public interface Transformation
      */
     Result execute(ClusterMetadata metadata);
 
-    default boolean allowDuringUpgrades()
+    /**
+     * Returns true if the type of Transformation (as identified by its KIND) 
is eligible to be committed to the
+     * metadata log given the state of the supplied ClusterMetadata. This is 
to ensure that newly introduced
+     * transformations can not be added to the log until all current members 
are able to read and enact them.
+     *
+     * The default implementation works in conjunction with {@link 
Kind#introducedIn}. It checks whether the lowest
+     * common serialization version in the cluster is at least equal to the 
minimum version required to deserialize
+     * transformations of that Kind.
+     *
+     * Specific Transformation implementations are free to consider other 
factors if they override this method. For
+     * example, they may inspect the Cassandra release versions as well as or 
instead of supported serialization
+     * versions.
+     *
+     * Note: this check pertains only to the KIND of Transformation. Even if 
the Kind is eligible to be applied, a
+     * specific instance of that kind may still be rejected based on its 
parameters and/or current cluster state.
+     * e.g. Transforms with a Kind of PREPARE_JOIN are eligible to be 
committed as their introducedIn() implementation
+     * returns Version.V0, but a specific PrepareJoin may still return a 
Rejected result if the target node is already
+     * in a JOINED state.
+     */
+    default boolean eligibleToCommit(ClusterMetadata metadata)
     {
-        return false;
+        return 
kind().introducedIn.isEqualOrBefore(metadata.directory.commonSerializationVersion);
     }
 
     static Success success(ClusterMetadata.Transformer transformer, 
LockedRanges.AffectedRanges affectedRanges)
@@ -194,69 +215,81 @@ public interface Transformation
 
     enum Kind
     {
-        PRE_INITIALIZE_CMS(0, () -> PreInitialize.serializer),
-        INITIALIZE_CMS(1, () -> Initialize.serializer),
-        FORCE_SNAPSHOT(2, () -> ForceSnapshot.serializer),
-        TRIGGER_SNAPSHOT(3, () -> TriggerSnapshot.serializer),
-        SCHEMA_CHANGE(4, () -> AlterSchema.serializer),
-        REGISTER(5, () -> Register.serializer),
-        UNREGISTER(6, () -> Unregister.serializer),
-
-        UNSAFE_JOIN(7, () -> UnsafeJoin.serializer),
-        PREPARE_JOIN(8, () -> PrepareJoin.serializer),
-        START_JOIN(9, () -> PrepareJoin.StartJoin.serializer),
-        MID_JOIN(10, () -> PrepareJoin.MidJoin.serializer),
-        FINISH_JOIN(11, () -> PrepareJoin.FinishJoin.serializer),
-
-        PREPARE_MOVE(12, () -> PrepareMove.serializer),
-        START_MOVE(13, () -> PrepareMove.StartMove.serializer),
-        MID_MOVE(14, () -> PrepareMove.MidMove.serializer),
-        FINISH_MOVE(15, () -> PrepareMove.FinishMove.serializer),
-
-        PREPARE_LEAVE(16, () -> PrepareLeave.serializer),
-        START_LEAVE(17, () -> PrepareLeave.StartLeave.serializer),
-        MID_LEAVE(18, () -> PrepareLeave.MidLeave.serializer),
-        FINISH_LEAVE(19, () -> PrepareLeave.FinishLeave.serializer),
-        ASSASSINATE(20, () -> Assassinate.serializer),
-
-        PREPARE_REPLACE(21, () -> PrepareReplace.serializer),
-        START_REPLACE(22, () -> PrepareReplace.StartReplace.serializer),
-        MID_REPLACE(23, () -> PrepareReplace.MidReplace.serializer),
-        FINISH_REPLACE(24, () -> PrepareReplace.FinishReplace.serializer),
-
-        CANCEL_SEQUENCE(25, () -> CancelInProgressSequence.serializer),
+        PRE_INITIALIZE_CMS(0, V0, () -> PreInitialize.serializer),
+        INITIALIZE_CMS(1, V0, () -> Initialize.serializer),
+        FORCE_SNAPSHOT(2, V0, () -> ForceSnapshot.serializer),
+        TRIGGER_SNAPSHOT(3, V0, () -> TriggerSnapshot.serializer),
+        SCHEMA_CHANGE(4, V0, () -> AlterSchema.serializer),
+        REGISTER(5, V0, () -> Register.serializer),
+        UNREGISTER(6, V0, () -> Unregister.serializer),
+
+        UNSAFE_JOIN(7, V0, () -> UnsafeJoin.serializer),
+        PREPARE_JOIN(8, V0, () -> PrepareJoin.serializer),
+        START_JOIN(9, V0, () -> PrepareJoin.StartJoin.serializer),
+        MID_JOIN(10, V0, () -> PrepareJoin.MidJoin.serializer),
+        FINISH_JOIN(11, V0, () -> PrepareJoin.FinishJoin.serializer),
+
+        PREPARE_MOVE(12, V0, () -> PrepareMove.serializer),
+        START_MOVE(13, V0, () -> PrepareMove.StartMove.serializer),
+        MID_MOVE(14, V0, () -> PrepareMove.MidMove.serializer),
+        FINISH_MOVE(15, V0, () -> PrepareMove.FinishMove.serializer),
+
+        PREPARE_LEAVE(16, V0, () -> PrepareLeave.serializer),
+        START_LEAVE(17, V0, () -> PrepareLeave.StartLeave.serializer),
+        MID_LEAVE(18, V0, () -> PrepareLeave.MidLeave.serializer),
+        FINISH_LEAVE(19, V0, () -> PrepareLeave.FinishLeave.serializer),
+        ASSASSINATE(20, V0, () -> Assassinate.serializer),
+
+        PREPARE_REPLACE(21, V0, () -> PrepareReplace.serializer),
+        START_REPLACE(22, V0, () -> PrepareReplace.StartReplace.serializer),
+        MID_REPLACE(23, V0, () -> PrepareReplace.MidReplace.serializer),
+        FINISH_REPLACE(24, V0, () -> PrepareReplace.FinishReplace.serializer),
+
+        CANCEL_SEQUENCE(25, V0, () -> CancelInProgressSequence.serializer),
 
         @Deprecated(since = "CEP-21")
-        START_ADD_TO_CMS(26, () -> StartAddToCMS.serializer),
+        START_ADD_TO_CMS(26, V0, () -> StartAddToCMS.serializer),
         @Deprecated(since = "CEP-21")
-        FINISH_ADD_TO_CMS(27, () -> FinishAddToCMS.serializer),
+        FINISH_ADD_TO_CMS(27, V0, () -> FinishAddToCMS.serializer),
         @Deprecated(since = "CEP-21")
-        REMOVE_FROM_CMS(28, () -> RemoveFromCMS.serializer),
+        REMOVE_FROM_CMS(28, V0, () -> RemoveFromCMS.serializer),
 
-        STARTUP(29, () -> Startup.serializer),
+        STARTUP(29, V0, () -> Startup.serializer),
 
-        CUSTOM(30, () -> CustomTransformation.serializer),
+        CUSTOM(30, V0, () -> CustomTransformation.serializer),
 
-        PREPARE_SIMPLE_CMS_RECONFIGURATION(31, () -> 
PrepareCMSReconfiguration.Simple.serializer),
-        PREPARE_COMPLEX_CMS_RECONFIGURATION(32, () -> 
PrepareCMSReconfiguration.Complex.serializer),
-        ADVANCE_CMS_RECONFIGURATION(33, () -> 
AdvanceCMSReconfiguration.serializer),
-        CANCEL_CMS_RECONFIGURATION(34, () -> 
CancelCMSReconfiguration.serializer),
-        ALTER_TOPOLOGY(35, () -> AlterTopology.serializer),
+        PREPARE_SIMPLE_CMS_RECONFIGURATION(31, V0, () -> 
PrepareCMSReconfiguration.Simple.serializer),
+        PREPARE_COMPLEX_CMS_RECONFIGURATION(32, V0, () -> 
PrepareCMSReconfiguration.Complex.serializer),
+        ADVANCE_CMS_RECONFIGURATION(33, V0, () -> 
AdvanceCMSReconfiguration.serializer),
+        CANCEL_CMS_RECONFIGURATION(34, V0, () -> 
CancelCMSReconfiguration.serializer),
+        ALTER_TOPOLOGY(35, V0, () -> AlterTopology.serializer),
 
-        UPDATE_AVAILABILITY(36, () -> ReconfigureAccordFastPath.serializer),
+        UPDATE_AVAILABILITY(36, Version.MIN_ACCORD_VERSION, () -> 
ReconfigureAccordFastPath.serializer),
 
-        BEGIN_CONSENSUS_MIGRATION_FOR_TABLE_AND_RANGE(37, () -> 
BeginConsensusMigrationForTableAndRange.serializer),
-        MAYBE_FINISH_CONSENSUS_MIGRATION_FOR_TABLE_AND_RANGE(38, () -> 
MaybeFinishConsensusMigrationForTableAndRange.serializer),
-        ACCORD_MARK_STALE(39, () -> AccordMarkStale.serializer),
-        ACCORD_MARK_REJOINING(40, () -> AccordMarkRejoining.serializer),
-        PREPARE_DROP_ACCORD_TABLE(41, () -> PrepareDropAccordTable.serializer),
-        FINISH_DROP_ACCORD_TABLE(42, () -> FinishDropAccordTable.serializer),
-        ACCORD_MARK_HARD_REMOVED(43, () -> AccordMarkHardRemoved.serializer),
+        BEGIN_CONSENSUS_MIGRATION_FOR_TABLE_AND_RANGE(37, 
Version.MIN_ACCORD_VERSION, () -> 
BeginConsensusMigrationForTableAndRange.serializer),
+        MAYBE_FINISH_CONSENSUS_MIGRATION_FOR_TABLE_AND_RANGE(38, 
Version.MIN_ACCORD_VERSION, () -> 
MaybeFinishConsensusMigrationForTableAndRange.serializer),
+        ACCORD_MARK_STALE(39, Version.MIN_ACCORD_VERSION, () -> 
AccordMarkStale.serializer),
+        ACCORD_MARK_REJOINING(40, Version.MIN_ACCORD_VERSION, () -> 
AccordMarkRejoining.serializer),
+        PREPARE_DROP_ACCORD_TABLE(41, Version.MIN_ACCORD_VERSION, () -> 
PrepareDropAccordTable.serializer),
+        FINISH_DROP_ACCORD_TABLE(42, Version.MIN_ACCORD_VERSION, () -> 
FinishDropAccordTable.serializer),
+        ACCORD_MARK_HARD_REMOVED(43, Version.MIN_ACCORD_VERSION, () -> 
AccordMarkHardRemoved.serializer),
         ;
 
-        private final Supplier<AsymmetricMetadataSerializer<Transformation, ? 
extends Transformation>> serializer;
+        /**
+         * The metadata serialization version which was current (as defined by
+         * {@link 
org.apache.cassandra.tcm.membership.NodeVersion#CURRENT_METADATA_VERSION}) when 
a transformation was
+         * first introduced. During an upgrade this is used to prevent new 
transformations being committed by upgraded
+         * nodes as nodes still running the older version will not be able to 
deserialize them.
+         *
+         * This should generally always be left as the first version the 
transformation was added and not be bumped when
+         * the implementation is modified. Adding Version guards to the 
associated {@link AsymmetricMetadataSerializer}
+         * is the way to handle this type of evolution.
+         */
+        public final Version introducedIn;
         public final int id;
 
+        private final Supplier<AsymmetricMetadataSerializer<Transformation, ? 
extends Transformation>> serializer;
+
         private static final Kind[] idToKindMap;
 
         static
@@ -271,10 +304,11 @@ public interface Transformation
             idToKindMap = idMap;
         }
 
-        Kind(int id, Supplier<AsymmetricMetadataSerializer<Transformation, ? 
extends Transformation>> serializer)
+        Kind(int id, Version introducedIn, 
Supplier<AsymmetricMetadataSerializer<Transformation, ? extends 
Transformation>> serializer)
         {
             this.serializer = serializer;
             this.id = id;
+            this.introducedIn = introducedIn;
         }
 
         public static Kind fromId(int id)
diff --git a/src/java/org/apache/cassandra/tcm/transformations/AlterSchema.java 
b/src/java/org/apache/cassandra/tcm/transformations/AlterSchema.java
index 09c5b68bb5..f2a8389c60 100644
--- a/src/java/org/apache/cassandra/tcm/transformations/AlterSchema.java
+++ b/src/java/org/apache/cassandra/tcm/transformations/AlterSchema.java
@@ -98,6 +98,11 @@ public class AlterSchema implements Transformation
         return Kind.SCHEMA_CHANGE;
     }
 
+    public boolean eligibleToCommit(ClusterMetadata metadata)
+    {
+        return schemaTransformation.compatibleWith(metadata);
+    }
+
     @Override
     public final Result execute(ClusterMetadata prev)
     {
diff --git 
a/src/java/org/apache/cassandra/tcm/transformations/CustomTransformation.java 
b/src/java/org/apache/cassandra/tcm/transformations/CustomTransformation.java
index bcacba652d..bbe09699d5 100644
--- 
a/src/java/org/apache/cassandra/tcm/transformations/CustomTransformation.java
+++ 
b/src/java/org/apache/cassandra/tcm/transformations/CustomTransformation.java
@@ -109,6 +109,12 @@ public class CustomTransformation implements Transformation
         return Kind.CUSTOM;
     }
 
+    @Override
+    public boolean eligibleToCommit(ClusterMetadata metadata)
+    {
+        return child.eligibleToCommit(metadata);
+    }
+
     public Result execute(ClusterMetadata prev)
     {
         return child.execute(prev);
diff --git a/src/java/org/apache/cassandra/tcm/transformations/Startup.java 
b/src/java/org/apache/cassandra/tcm/transformations/Startup.java
index 7a2ecb34c5..a1abfdbff4 100644
--- a/src/java/org/apache/cassandra/tcm/transformations/Startup.java
+++ b/src/java/org/apache/cassandra/tcm/transformations/Startup.java
@@ -126,12 +126,6 @@ public class Startup implements Transformation
                '}';
     }
 
-    @Override
-    public boolean allowDuringUpgrades()
-    {
-        return true;
-    }
-
     public static void maybeExecuteStartupTransformation(NodeId localNodeId)
     {
         Directory directory = ClusterMetadata.current().directory;
diff --git 
a/src/java/org/apache/cassandra/tcm/transformations/TriggerSnapshot.java 
b/src/java/org/apache/cassandra/tcm/transformations/TriggerSnapshot.java
index 6c53a6618b..98f0f94f27 100644
--- a/src/java/org/apache/cassandra/tcm/transformations/TriggerSnapshot.java
+++ b/src/java/org/apache/cassandra/tcm/transformations/TriggerSnapshot.java
@@ -48,12 +48,6 @@ public class TriggerSnapshot implements Transformation
         return Kind.TRIGGER_SNAPSHOT;
     }
 
-    @Override
-    public boolean allowDuringUpgrades()
-    {
-        return true;
-    }
-
     @Override
     public Result execute(ClusterMetadata prev)
     {
diff --git 
a/src/java/org/apache/cassandra/tcm/transformations/cms/Initialize.java 
b/src/java/org/apache/cassandra/tcm/transformations/cms/Initialize.java
index 9564372c6f..11c2ed4d31 100644
--- a/src/java/org/apache/cassandra/tcm/transformations/cms/Initialize.java
+++ b/src/java/org/apache/cassandra/tcm/transformations/cms/Initialize.java
@@ -61,12 +61,6 @@ public class Initialize extends ForceSnapshot
         super(baseState);
     }
 
-    @Override
-    public boolean allowDuringUpgrades()
-    {
-        return true;
-    }
-
     public Kind kind()
     {
         return Kind.INITIALIZE_CMS;
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/PaxosRepairTest.java 
b/test/distributed/org/apache/cassandra/distributed/test/PaxosRepairTest.java
index d18f3a6f06..ad750b0b7d 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/PaxosRepairTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/PaxosRepairTest.java
@@ -123,7 +123,6 @@ public class PaxosRepairTest extends TestBaseImpl
     {
         CassandraRelevantProperties.PAXOS_USE_SELF_EXECUTION.setBoolean(false);
         
CassandraRelevantProperties.TCM_USE_ATOMIC_LONG_PROCESSOR.setBoolean(true);
-        
CassandraRelevantProperties.TCM_ALLOW_TRANSFORMATIONS_DURING_UPGRADES.setBoolean(true);
 // for paxosRepairVersionGate
         DatabaseDescriptor.daemonInitialization();
     }
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/CMSTestBase.java 
b/test/distributed/org/apache/cassandra/distributed/test/log/CMSTestBase.java
index c89b52c047..bfec054085 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/log/CMSTestBase.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/CMSTestBase.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.locator.MetaStrategy;
 import org.apache.cassandra.schema.DistributedSchema;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.Keyspaces;
+import org.apache.cassandra.schema.SchemaTestUtil;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.ClusterMetadataService;
 import org.apache.cassandra.tcm.Commit;
@@ -97,9 +98,7 @@ public class CMSTestBase
                 }
 
             });
-            service.commit(new AlterSchema((cm) -> {
-                return 
cm.schema.getKeyspaces().with(Keyspaces.of(KeyspaceMetadata.create("test", 
rf.asKeyspaceParams())));
-            }));
+            service.commit(new 
AlterSchema(SchemaTestUtil.toTransformation(metadata -> 
metadata.schema.getKeyspaces().with(Keyspaces.of(KeyspaceMetadata.create("test",
 rf.asKeyspaceParams()))))));
         }
 
         public void close()
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java
 
b/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java
index 0acd6895b8..43c147c255 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java
@@ -54,8 +54,8 @@ import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.Keyspaces;
 import org.apache.cassandra.schema.ReplicationParams;
 import org.apache.cassandra.schema.MemtableParams;
-import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.schema.SchemaTestUtil;
 import org.apache.cassandra.schema.SchemaTransformation;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.accord.AccordStaleReplicas;
@@ -100,6 +100,7 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Throwables;
 
+import static org.apache.cassandra.schema.SchemaTestUtil.submit;
 import static org.junit.Assert.assertEquals;
 
 public class ClusterMetadataTestHelper
@@ -235,7 +236,7 @@ public class ClusterMetadataTestHelper
         }
         else
         {
-            Schema.instance.submit(cms -> {
+            submit(cms -> {
                 var km = cms.schema.getKeyspaceMetadata(ks);
                 var update = 
km.withSwapped(km.tables.withSwapped(km.tables.getNullable(table).unbuild()
                                                                      
.memtable(memtable)
@@ -309,10 +310,15 @@ public class ClusterMetadataTestHelper
     }
 
     public static NodeId register(InetAddressAndPort endpoint, String dc, 
String rack)
+    {
+        return register(endpoint, dc, rack, NodeVersion.CURRENT);
+    }
+
+    public static NodeId register(InetAddressAndPort endpoint, String dc, 
String rack, NodeVersion version)
     {
         try
         {
-            NodeId id = commit(new Register(addr(endpoint), new Location(dc, 
rack), NodeVersion.CURRENT)).directory.peerId(endpoint);
+            NodeId id = commit(new Register(addr(endpoint), new Location(dc, 
rack), version)).directory.peerId(endpoint);
             if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
                 RegistrationStatus.instance.onRegistration();
             return id;
@@ -891,11 +897,12 @@ public class ClusterMetadataTestHelper
             metadata = ClusterMetadataService.instance().commit(next);
         }
     }
+
     public static void addOrUpdateKeyspace(KeyspaceMetadata keyspace)
     {
         try
         {
-            SchemaTransformation transformation = (cm) -> 
cm.schema.getKeyspaces().withAddedOrUpdated(keyspace);
+            SchemaTransformation transformation = 
SchemaTestUtil.toTransformation(metadata -> 
metadata.schema.getKeyspaces().withAddedOrUpdated(keyspace));
             commit(new AlterSchema(transformation));
         }
         catch (Exception e)
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/RegisterTest.java 
b/test/distributed/org/apache/cassandra/distributed/test/log/RegisterTest.java
index 786dc79388..a50c105998 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/log/RegisterTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/RegisterTest.java
@@ -31,7 +31,6 @@ import org.apache.cassandra.distributed.api.IInstanceConfig;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.distributed.api.TokenSupplier;
 import org.apache.cassandra.distributed.shared.NetworkTopology;
-import org.apache.cassandra.distributed.shared.WithProperties;
 import org.apache.cassandra.distributed.test.TestBaseImpl;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.locator.InetAddressAndPort;
@@ -55,7 +54,6 @@ import org.apache.cassandra.tcm.transformations.Startup;
 import org.apache.cassandra.tcm.transformations.Unregister;
 import org.apache.cassandra.utils.CassandraVersion;
 
-import static 
org.apache.cassandra.config.CassandraRelevantProperties.TCM_ALLOW_TRANSFORMATIONS_DURING_UPGRADES;
 import static org.junit.Assert.assertEquals;
 
 public class RegisterTest extends TestBaseImpl
@@ -103,8 +101,7 @@ public class RegisterTest extends TestBaseImpl
     public void serializationVersionCeilingTest() throws Throwable
     {
         try (Cluster cluster = builder().withNodes(1)
-                                        .createWithoutStarting();
-             WithProperties prop = new 
WithProperties().set(TCM_ALLOW_TRANSFORMATIONS_DURING_UPGRADES, "true"))
+                                        .createWithoutStarting())
         {
             final String firstNodeEndpoint = "127.0.0.10";
             cluster.get(1).startup();
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/ring/DecommissionTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/ring/DecommissionTest.java
index db2f2eca6d..156a957f94 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/ring/DecommissionTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/ring/DecommissionTest.java
@@ -41,7 +41,6 @@ import org.apache.cassandra.distributed.Constants;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
-import org.apache.cassandra.distributed.api.NodeToolResult;
 import org.apache.cassandra.distributed.api.TokenSupplier;
 import org.apache.cassandra.distributed.shared.ClusterUtils;
 import org.apache.cassandra.distributed.test.TestBaseImpl;
@@ -53,7 +52,6 @@ import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.ClusterMetadataService;
 import org.apache.cassandra.tcm.membership.NodeId;
 import org.apache.cassandra.tcm.membership.NodeVersion;
-import org.apache.cassandra.tcm.serialization.Version;
 import org.apache.cassandra.tcm.transformations.Startup;
 import org.apache.cassandra.utils.CassandraVersion;
 import org.apache.cassandra.utils.FBUtilities;
@@ -236,49 +234,6 @@ public class DecommissionTest extends TestBaseImpl
         }
     }
 
-    @Test
-    public void testMixedVersionBlockDecom() throws IOException {
-        try (Cluster cluster = builder().withNodes(3)
-                                        .withConfig(config -> 
config.with(GOSSIP, NETWORK))
-                                        .start())
-        {
-            cluster.get(3).nodetoolResult("decommission", 
"--force").asserts().success();
-
-            // make node2 run V0:
-            cluster.get(2).runOnInstance(() -> {
-                ClusterMetadata metadata = ClusterMetadata.current();
-
-                ClusterMetadataService.instance().commit(new 
Startup(metadata.myNodeId(),
-                                                                     
metadata.directory.getNodeAddresses(metadata.myNodeId()),
-                                                                     new 
NodeVersion(new CassandraVersion("4.0.0"),
-                                                                               
      Version.V0)));
-            });
-
-            // make node1 run V1:
-            cluster.get(1).runOnInstance(() -> {
-                ClusterMetadata metadata = ClusterMetadata.current();
-
-                ClusterMetadataService.instance().commit(new 
Startup(metadata.myNodeId(),
-                                                                     
metadata.directory.getNodeAddresses(metadata.myNodeId()),
-                                                                     new 
NodeVersion(new CassandraVersion("6.0.0"),
-                                                                               
      NodeVersion.CURRENT_METADATA_VERSION)));
-            });
-            ClusterUtils.waitForCMSToQuiesce(cluster, cluster.get(1), 3);
-            NodeToolResult res = cluster.get(2).nodetoolResult("decommission", 
"--force");
-            res.asserts().failure();
-            assertTrue(res.getStdout().contains("Upgrade in progress"));
-            cluster.get(2).runOnInstance(() -> {
-                ClusterMetadata metadata = ClusterMetadata.current();
-
-                ClusterMetadataService.instance().commit(new 
Startup(metadata.myNodeId(),
-                                                                     
metadata.directory.getNodeAddresses(metadata.myNodeId()),
-                                                                     new 
NodeVersion(new CassandraVersion("6.0.0"),
-                                                                               
      NodeVersion.CURRENT_METADATA_VERSION)));
-            });
-            cluster.get(2).nodetoolResult("decommission", 
"--force").asserts().success();
-        }
-    }
-
     @Test
     public void testPeersPostDecom() throws IOException
     {
diff --git 
a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java
 
b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java
index 0350fca36b..750141674a 100644
--- 
a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java
+++ 
b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java
@@ -30,7 +30,6 @@ import org.junit.runners.Parameterized;
 
 import org.apache.cassandra.ServerTestUtils;
 import org.apache.cassandra.Util;
-import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.cql3.Duration;
 import org.apache.cassandra.gms.Gossiper;
@@ -63,7 +62,6 @@ public class InsertUpdateIfConditionTest extends CQLTester
     @BeforeClass
     public static void beforeClass()
     {
-        
CassandraRelevantProperties.TCM_ALLOW_TRANSFORMATIONS_DURING_UPGRADES.setBoolean(true);
         Gossiper.instance.start(0);
     }
 
diff --git 
a/test/unit/org/apache/cassandra/schema/SchemaChangeDuringRangeMovementTest.java
 
b/test/unit/org/apache/cassandra/schema/SchemaChangeDuringRangeMovementTest.java
index d54b2e5e31..6fc4c640b5 100644
--- 
a/test/unit/org/apache/cassandra/schema/SchemaChangeDuringRangeMovementTest.java
+++ 
b/test/unit/org/apache/cassandra/schema/SchemaChangeDuringRangeMovementTest.java
@@ -147,14 +147,15 @@ public class SchemaChangeDuringRangeMovementTest extends 
CQLTester
         execute(String.format("CREATE KEYSPACE %s " +
                               "WITH REPLICATION = 
{'class':'SimpleStrategy','replication_factor':9}", RF9_KS4));
 
-        SchemaTransformation dropAllowed = (metadata_) -> 
metadata_.schema.getKeyspaces().without(RF9_KS4).without(RF9_KS3);
+        SchemaTransformation dropAllowed = 
SchemaTestUtil.toTransformation(metadata2 -> 
metadata2.schema.getKeyspaces().without(RF9_KS4).without(RF9_KS3));
+
         metadata = ClusterMetadataService.instance().commit(new 
AlterSchema(dropAllowed));
         assertFalse(metadata.schema.getKeyspaces().containsKeyspace(RF9_KS4));
         assertFalse(metadata.schema.getKeyspaces().containsKeyspace(RF9_KS3));
 
         try
         {
-            SchemaTransformation dropRejected = (metadata_) -> 
metadata_.schema.getKeyspaces().without(RF9_KS2).without(RF9_KS1);
+            SchemaTransformation dropRejected = 
SchemaTestUtil.toTransformation(metadata1 -> 
metadata1.schema.getKeyspaces().without(RF9_KS2).without(RF9_KS1));
             ClusterMetadataService.instance().commit(new 
AlterSchema(dropRejected));
             fail("Expected exception");
         }
diff --git a/test/unit/org/apache/cassandra/schema/SchemaTest.java 
b/test/unit/org/apache/cassandra/schema/SchemaTest.java
index fcf3c6997b..b290a843c8 100644
--- a/test/unit/org/apache/cassandra/schema/SchemaTest.java
+++ b/test/unit/org/apache/cassandra/schema/SchemaTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.cassandra.schema;
 
+import java.util.function.Function;
 import java.util.function.Predicate;
 
 import org.junit.Before;
@@ -28,12 +29,14 @@ import org.apache.cassandra.ServerTestUtils;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.statements.schema.AlterSchemaStatement;
+import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.Epoch;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class SchemaTest
 {
@@ -61,7 +64,7 @@ public class SchemaTest
         Tables tables = Tables.of(TableMetadata.minimal(KS_ONE, "modified1"),
                                   TableMetadata.minimal(KS_ONE, "modified2"));
         KeyspaceMetadata ksm = KeyspaceMetadata.create(KS_ONE, 
KeyspaceParams.simple(1), tables);
-        applyAndAssertTableMetadata((metadata) -> 
metadata.schema.getKeyspaces().withAddedOrUpdated(ksm), true);
+        applyAndAssertTableMetadata(metadata -> 
metadata.schema.getKeyspaces().withAddedOrUpdated(ksm), true);
     }
 
     @Test
@@ -69,12 +72,12 @@ public class SchemaTest
     {
         // Create an empty keyspace
         KeyspaceMetadata ksm = KeyspaceMetadata.create(KS_ONE, 
KeyspaceParams.simple(1));
-        Schema.instance.submit((metadata) -> 
metadata.schema.getKeyspaces().withAddedOrUpdated(ksm));
+        SchemaTestUtil.submit(metadata -> 
metadata.schema.getKeyspaces().withAddedOrUpdated(ksm));
 
         // Add two tables and verify that the resultant table metadata has the 
correct epoch
         Tables tables = Tables.of(TableMetadata.minimal(KS_ONE, "modified1"), 
TableMetadata.minimal(KS_ONE, "modified2"));
         KeyspaceMetadata updated = ksm.withSwapped(tables);
-        applyAndAssertTableMetadata((metadata) -> 
metadata.schema.getKeyspaces().withAddedOrUpdated(updated), true);
+        applyAndAssertTableMetadata(metadata -> 
metadata.schema.getKeyspaces().withAddedOrUpdated(updated), true);
     }
 
     @Test
@@ -82,12 +85,12 @@ public class SchemaTest
     {
         Tables tables = Tables.of(TableMetadata.minimal(KS_ONE, "unmodified"));
         KeyspaceMetadata ksm = KeyspaceMetadata.create(KS_ONE, 
KeyspaceParams.simple(1), tables);
-        Schema.instance.submit((metadata) -> 
metadata.schema.getKeyspaces().withAddedOrUpdated(ksm));
+        SchemaTestUtil.submit(metadata -> 
metadata.schema.getKeyspaces().withAddedOrUpdated(ksm));
 
         // Add a second table and assert that its table metadata has the 
latest epoch, but that the
         // metadata of the other table stays unmodified
         KeyspaceMetadata updated = 
ksm.withSwapped(tables.with(TableMetadata.minimal(KS_ONE, "modified1")));
-        applyAndAssertTableMetadata((metadata) -> 
metadata.schema.getKeyspaces().withAddedOrUpdated(updated));
+        applyAndAssertTableMetadata(metadata -> 
metadata.schema.getKeyspaces().withAddedOrUpdated(updated), false);
     }
 
     @Test
@@ -95,7 +98,7 @@ public class SchemaTest
     {
         Tables tables = Tables.of(TableMetadata.minimal(KS_ONE, "unmodified"));
         KeyspaceMetadata ksm = KeyspaceMetadata.create(KS_ONE, 
KeyspaceParams.simple(1), tables);
-        Schema.instance.submit((metadata) -> 
metadata.schema.getKeyspaces().withAddedOrUpdated(ksm));
+        SchemaTestUtil.submit(metadata -> 
metadata.schema.getKeyspaces().withAddedOrUpdated(ksm));
 
         applyAndAssertTableMetadata(cql(KS_ONE, "CREATE TABLE %s.modified (k 
int PRIMARY KEY)"));
     }
@@ -105,16 +108,16 @@ public class SchemaTest
     {
         KeyspaceMetadata ksm1 = KeyspaceMetadata.create(KS_ONE, 
KeyspaceParams.simple(1));
         KeyspaceMetadata ksm2 = KeyspaceMetadata.create(KS_TWO, 
KeyspaceParams.simple(1));
-        Schema.instance.submit((metadata) -> 
metadata.schema.getKeyspaces().withAddedOrUpdated(ksm1).withAddedOrUpdated(ksm2));
+        SchemaTestUtil.submit(metadata -> 
metadata.schema.getKeyspaces().withAddedOrUpdated(ksm1).withAddedOrUpdated(ksm2));
 
         // Add two tables in each ks and verify that the resultant table 
metadata has the correct epoch
         Tables tables1 = Tables.of(TableMetadata.minimal(KS_ONE, "modified1"), 
TableMetadata.minimal(KS_ONE, "modified2"));
         KeyspaceMetadata updated1 = ksm1.withSwapped(tables1);
         Tables tables2 = Tables.of(TableMetadata.minimal(KS_TWO, "modified1"), 
TableMetadata.minimal(KS_TWO, "modified2"));
         KeyspaceMetadata updated2 = ksm2.withSwapped(tables2);
-        applyAndAssertTableMetadata((metadata) -> 
metadata.schema.getKeyspaces()
-                                                                         
.withAddedOrUpdated(updated1)
-                                                                         
.withAddedOrUpdated(updated2),
+        applyAndAssertTableMetadata(metadata -> metadata.schema.getKeyspaces()
+                                                               
.withAddedOrUpdated(updated1)
+                                                               
.withAddedOrUpdated(updated2),
                                     true);
     }
 
@@ -124,14 +127,14 @@ public class SchemaTest
     {
         KeyspaceMetadata ksm1 = KeyspaceMetadata.create(KS_ONE, 
KeyspaceParams.simple(1));
         KeyspaceMetadata ksm2 = KeyspaceMetadata.create(KS_TWO, 
KeyspaceParams.simple(1));
-        Schema.instance.submit((metadata) -> 
metadata.schema.getKeyspaces().withAddedOrUpdated(ksm1).withAddedOrUpdated(ksm2));
+        SchemaTestUtil.submit(metadata -> 
metadata.schema.getKeyspaces().withAddedOrUpdated(ksm1).withAddedOrUpdated(ksm2));
 
         // Add two tables in each ks and verify that the resultant table 
metadata has the correct epoch
         Tables tables1 = Tables.of(TableMetadata.minimal(KS_ONE, 
"unmodified1"), TableMetadata.minimal(KS_ONE, "unmodified2"));
         KeyspaceMetadata updated1 = ksm1.withSwapped(tables1);
         Tables tables2 = Tables.of(TableMetadata.minimal(KS_TWO, 
"unmodified1"), TableMetadata.minimal(KS_TWO, "unmodified2"));
         KeyspaceMetadata updated2 = ksm2.withSwapped(tables2);
-        Schema.instance.submit((metadata) -> 
metadata.schema.getKeyspaces().withAddedOrUpdated(updated1).withAddedOrUpdated(updated2));
+        SchemaTestUtil.submit(metadata -> 
metadata.schema.getKeyspaces().withAddedOrUpdated(updated1).withAddedOrUpdated(updated2));
 
         // Add a third table in one ks and assert that its table metadata has 
the latest epoch, but that the
         // metadata of the all other tables stays unmodified
@@ -180,15 +183,42 @@ public class SchemaTest
         applyAndAssertTableMetadata(cql(KS_ONE, "ALTER TABLE %s.modified WITH 
comment = 'altered'"));
     }
 
+    @Test
+    public void schemaTransformationRejectionTest()
+    {
+        try
+        {
+            Schema.instance.submit(new SchemaTransformation()
+            {
+                @Override
+                public Keyspaces apply(ClusterMetadata metadata)
+                {
+                    return null;
+                }
+
+                @Override
+                public boolean compatibleWith(ClusterMetadata metadata)
+                {
+                    return false;
+                }
+            });
+            fail("Unsupported transformation should have failed");
+        }
+        catch (InvalidRequestException e)
+        {
+            assertTrue(e.getMessage().contains("Transformation rejected"));
+        }
+    }
+
     private void applyAndAssertTableMetadata(SchemaTransformation 
transformation)
     {
-        applyAndAssertTableMetadata(transformation, false);
+        applyAndAssertTableMetadata(transformation::apply, false);
     }
 
-    private void applyAndAssertTableMetadata(SchemaTransformation 
transformation, boolean onlyModified)
+    private void applyAndAssertTableMetadata(Function<ClusterMetadata, 
Keyspaces> transformation, boolean onlyModified)
     {
         Epoch before = ClusterMetadata.current().epoch;
-        Schema.instance.submit(transformation);
+        SchemaTestUtil.submit(transformation);
         Epoch after = ClusterMetadata.current().epoch;
         assertTrue(after.isDirectlyAfter(before));
         DistributedSchema schema = ClusterMetadata.current().schema;
@@ -207,11 +237,11 @@ public class SchemaTest
         });
     }
 
-    private static AlterSchemaStatement cql(String keyspace, String cql)
+    private static SchemaTransformation cql(String keyspace, String cql)
     {
         AlterSchemaStatement statement = (AlterSchemaStatement) 
QueryProcessor.parseStatement(String.format(cql, keyspace))
                                                                               
.prepare(ClientState.forInternalCalls());
         statement.setExecutionTimestamp(System.currentTimeMillis() * 1000);
-        return statement;
+        return SchemaTestUtil.toTransformation(statement::apply);
     }
 }
diff --git a/test/unit/org/apache/cassandra/schema/SchemaTestUtil.java 
b/test/unit/org/apache/cassandra/schema/SchemaTestUtil.java
index 9fb2aac882..f75cecd159 100644
--- a/test/unit/org/apache/cassandra/schema/SchemaTestUtil.java
+++ b/test/unit/org/apache/cassandra/schema/SchemaTestUtil.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.schema;
 
 import java.util.Collections;
+import java.util.function.Function;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,18 +40,7 @@ public class SchemaTestUtil
             throw new AlreadyExistsException(ksm.name);
 
         logger.info("Create new Keyspace: {}", ksm);
-        Schema.instance.submit(new SchemaTransformation()
-        {
-            public Keyspaces apply(ClusterMetadata metadata)
-            {
-                return metadata.schema.getKeyspaces().withAddedOrUpdated(ksm);
-            }
-
-            public String cql()
-            {
-                return "fake";
-            }
-        });
+        submit((metadata) -> 
metadata.schema.getKeyspaces().withAddedOrUpdated(ksm));
     }
 
     public static void announceNewTable(TableMetadata cfm)
@@ -70,7 +60,7 @@ public class SchemaTestUtil
             throw new AlreadyExistsException(cfm.keyspace, cfm.name);
 
         logger.info("Create new table: {}", cfm);
-        Schema.instance.submit((metadata) -> 
metadata.schema.getKeyspaces().withAddedOrUpdated(ksm.withSwapped(ksm.tables.with(cfm))));
+        submit((metadata) -> 
metadata.schema.getKeyspaces().withAddedOrUpdated(ksm.withSwapped(ksm.tables.with(cfm))));
     }
 
     static void announceKeyspaceUpdate(KeyspaceMetadata ksm)
@@ -82,7 +72,7 @@ public class SchemaTestUtil
             throw new ConfigurationException(String.format("Cannot update non 
existing keyspace '%s'.", ksm.name));
 
         logger.info("Update Keyspace '{}' From {} To {}", ksm.name, oldKsm, 
ksm);
-        Schema.instance.submit((metadata) -> 
metadata.schema.getKeyspaces().withAddedOrUpdated(ksm));
+        submit((metadata) -> 
metadata.schema.getKeyspaces().withAddedOrUpdated(ksm));
     }
 
     public static void announceTableUpdate(TableMetadata updated)
@@ -97,7 +87,7 @@ public class SchemaTestUtil
         updated.validateCompatibility(current);
 
         logger.info("Update table '{}/{}' From {} To {}", current.keyspace, 
current.name, current, updated);
-        Schema.instance.submit((metadata) -> 
metadata.schema.getKeyspaces().withAddedOrUpdated(ksm.withSwapped(ksm.tables.withSwapped(updated))));
+        submit((metadata) -> 
metadata.schema.getKeyspaces().withAddedOrUpdated(ksm.withSwapped(ksm.tables.withSwapped(updated))));
     }
 
     static void announceKeyspaceDrop(String ksName)
@@ -107,12 +97,13 @@ public class SchemaTestUtil
             throw new ConfigurationException(String.format("Cannot drop non 
existing keyspace '%s'.", ksName));
 
         logger.info("Drop Keyspace '{}'", oldKsm.name);
-        Schema.instance.submit((metadata) -> 
metadata.schema.getKeyspaces().without(ksName));
+        submit((metadata) -> metadata.schema.getKeyspaces().without(ksName));
     }
 
-    public static SchemaTransformation dropTable(String ksName, String cfName)
+    public static void announceTableDrop(String ksName, String cfName)
     {
-        return (metadata) -> {
+        logger.info("Drop table '{}/{}'", ksName, cfName);
+        submit((metadata) -> {
             Keyspaces schema = metadata.schema.getKeyspaces();
             KeyspaceMetadata ksm = schema.getNullable(ksName);
             TableMetadata tm = ksm != null ? 
ksm.getTableOrViewNullable(cfName) : null;
@@ -120,28 +111,63 @@ public class SchemaTestUtil
                 throw new ConfigurationException(String.format("Cannot drop 
non existing table '%s' in keyspace '%s'.", cfName, ksName));
 
             return 
schema.withAddedOrUpdated(ksm.withSwapped(ksm.tables.without(cfName)));
-        };
-    }
-
-    public static void announceTableDrop(String ksName, String cfName)
-    {
-        logger.info("Drop table '{}/{}'", ksName, cfName);
-        Schema.instance.submit(dropTable(ksName, cfName));
+        });
     }
 
     public static void addOrUpdateKeyspace(KeyspaceMetadata ksm)
     {
-        Schema.instance.submit((metadata) -> 
metadata.schema.getKeyspaces().withAddedOrUpdated(ksm));
+        submit((metadata) -> 
metadata.schema.getKeyspaces().withAddedOrUpdated(ksm));
     }
 
     @Deprecated(since = "CEP-21") // TODO remove this
     public static void addOrUpdateKeyspace(KeyspaceMetadata ksm, boolean 
locally)
     {
-        Schema.instance.submit((metadata) -> 
metadata.schema.getKeyspaces().withAddedOrUpdated(ksm));
+        submit((metadata) -> 
metadata.schema.getKeyspaces().withAddedOrUpdated(ksm));
     }
 
     public static void dropKeyspaceIfExist(String ksName, boolean locally)
     {
-        Schema.instance.submit((metadata) -> 
metadata.schema.getKeyspaces().without(Collections.singletonList(ksName)));
+        submit((metadata) -> 
metadata.schema.getKeyspaces().without(Collections.singletonList(ksName)));
+    }
+
+    public static void submit(Function<ClusterMetadata, Keyspaces> 
transformation)
+    {
+        Schema.instance.submit(toTransformation(transformation));
+    }
+
+    /**
+     * Returns a {@link SchemaTransformation} with an {@code compatibleWith} 
implementation hardcoded to return true.
+     *
+     * This is intended for use in unit tests where there is only a single 
Cassandra version to consider, i.e. the
+     * transformation can be assumed compatible with whatever the current 
{@link ClusterMetadata} state is. It isn't
+     * suitable for dtests/upgrade tests etc. It is also worth noting that the 
{@code cql} method of the returned
+     * {@code SchemaTransformation} does not produce valid CQL, which 
procludes these transformations from being
+     * serialized/deserialized.
+     *
+     * @param transformation function to apply in order to modify schema
+     * @return SchemaTransformation instance that wraps the supplied function
+     */
+    public static SchemaTransformation 
toTransformation(Function<ClusterMetadata, Keyspaces> transformation)
+    {
+       return new SchemaTransformation()
+       {
+           @Override
+           public Keyspaces apply(ClusterMetadata metadata)
+           {
+               return transformation.apply(metadata);
+           }
+
+           @Override
+           public boolean compatibleWith(ClusterMetadata metadata)
+           {
+               return true;
+           }
+
+           @Override
+           public String cql()
+           {
+               return "fake";
+           }
+       };
     }
 }
diff --git a/test/unit/org/apache/cassandra/tcm/ClusterMetadataTest.java 
b/test/unit/org/apache/cassandra/tcm/ClusterMetadataTest.java
index e6d7243a46..50e085fc6d 100644
--- a/test/unit/org/apache/cassandra/tcm/ClusterMetadataTest.java
+++ b/test/unit/org/apache/cassandra/tcm/ClusterMetadataTest.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.tcm;
 
 import java.util.concurrent.ExecutionException;
 
+import com.google.common.collect.ImmutableSet;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -28,14 +29,27 @@ import org.apache.cassandra.ServerTestUtils;
 import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper;
 import org.apache.cassandra.distributed.test.log.CMSTestBase;
 import org.apache.cassandra.harry.model.TokenPlacementModel;
+import org.apache.cassandra.schema.Keyspaces;
+import org.apache.cassandra.schema.SchemaTransformation;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.membership.NodeVersion;
+import org.apache.cassandra.tcm.ownership.UniformRangePlacement;
 import org.apache.cassandra.tcm.sequences.BootstrapAndJoin;
+import org.apache.cassandra.tcm.sequences.LockedRanges;
 import org.apache.cassandra.tcm.sequences.UnbootstrapAndLeave;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.tcm.ownership.DataPlacement;
+import org.apache.cassandra.tcm.serialization.Version;
+import org.apache.cassandra.tcm.transformations.AlterSchema;
+import org.apache.cassandra.tcm.transformations.Assassinate;
+import org.apache.cassandra.tcm.transformations.CustomTransformation;
+import org.apache.cassandra.utils.CassandraVersion;
 
+import static 
org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper.addr;
 import static 
org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper.getLeavePlan;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class ClusterMetadataTest
 {
@@ -106,4 +120,76 @@ public class ClusterMetadataTest
     {
         // todo
     }
+
+    @Test
+    public void testNewTransformationCommit()
+    {
+        newTransformationHelper(new CustomTransformation("TEST", new 
NewTransformation()));
+    }
+
+    @Test
+    public void testNewSchemaTransformation()
+    {
+        newTransformationHelper(new AlterSchema(new V5SchemaTransformation()));
+    }
+
+    private static void newTransformationHelper(Transformation transformation)
+    {
+        NodeId v4Node = null;
+        for (int i = 1; i <= 4; i++)
+        {
+            NodeId nodeId = ClusterMetadataTestHelper.register(addr(i), "dc0", 
"rack0", new NodeVersion(CassandraVersion.CASSANDRA_5_0, i == 4 ? Version.V4 : 
Version.V5));
+            if (i == 4)
+                v4Node = nodeId;
+            ClusterMetadataTestHelper.join(i, i);
+        }
+
+        try
+        {
+            ClusterMetadataService.instance().commit(transformation);
+            fail("Should not be able to commit V5 transformation in V4 
cluster");
+        }
+        catch (IllegalStateException e)
+        {
+            assertTrue(e.getMessage().contains("Transformation rejected"));
+        }
+        ClusterMetadataService.instance().commit(new Assassinate(v4Node, new 
UniformRangePlacement()));
+        ClusterMetadataService.instance().commit(transformation);
+    }
+
+    public static class V5SchemaTransformation implements SchemaTransformation
+    {
+        @Override
+        public Keyspaces apply(ClusterMetadata metadata)
+        {
+            return metadata.schema.getKeyspaces();
+        }
+
+        @Override
+        public boolean compatibleWith(ClusterMetadata metadata)
+        {
+            return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V5);
+        }
+    }
+
+    public static class NewTransformation implements Transformation
+    {
+        @Override
+        public Kind kind()
+        {
+            return Kind.CUSTOM;
+        }
+
+        @Override
+        public Result execute(ClusterMetadata metadata)
+        {
+            return new Success(metadata, LockedRanges.AffectedRanges.EMPTY, 
ImmutableSet.of());
+        }
+
+        @Override
+        public boolean eligibleToCommit(ClusterMetadata metadata)
+        {
+            return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V5);
+        }
+    }
 }
diff --git 
a/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java 
b/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java
index c7b9cbef4b..55470d5991 100644
--- 
a/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java
+++ 
b/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java
@@ -57,13 +57,17 @@ import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Keyspaces;
 import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.SchemaTransformation;
 import org.apache.cassandra.schema.SchemaTransformations;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.Tables;
 import org.apache.cassandra.schema.Types;
 import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.serialization.Version;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.JavaDriverUtils;
@@ -621,7 +625,20 @@ public class StressCQLSSTableWriter implements Closeable
             String keyspace = schemaStatement.keyspace();
 
             KeyspaceMetadata ksm = KeyspaceMetadata.create(keyspace, 
KeyspaceParams.simple(1));
-            Schema.instance.submit((metadata) ->  
metadata.schema.getKeyspaces().withAddedOrUpdated(ksm));
+            Schema.instance.submit(new SchemaTransformation()
+            {
+                @Override
+                public Keyspaces apply(ClusterMetadata metadata)
+                {
+                    return 
metadata.schema.getKeyspaces().withAddedOrUpdated(ksm);
+                }
+
+                @Override
+                public boolean compatibleWith(ClusterMetadata metadata)
+                {
+                    return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V0);
+                }
+            });
 
             Types types = createTypes(keyspace, typeStatements);
             Schema.instance.submit(SchemaTransformations.addTypes(types, 
true));
@@ -640,7 +657,20 @@ public class StressCQLSSTableWriter implements Closeable
                                      .build();
             Tables tables = Tables.of(tableMetadata);
             KeyspaceMetadata updated = ksm.withSwapped(tables);
-            Schema.instance.submit((metadata) ->  
metadata.schema.getKeyspaces().withAddedOrUpdated(updated));
+            Schema.instance.submit(new SchemaTransformation()
+                                   {
+                                       @Override
+                                       public Keyspaces apply(ClusterMetadata 
metadata)
+                                       {
+                                           return 
metadata.schema.getKeyspaces().withAddedOrUpdated(updated);
+                                       }
+
+                                       @Override
+                                       public boolean 
compatibleWith(ClusterMetadata metadata)
+                                       {
+                                           return 
metadata.directory.commonSerializationVersion.isAtLeast(Version.V0);
+                                       }
+                                   });
             Keyspace.setInitialized();
             Directories directories = new Directories(tableMetadata, 
directoryList.stream().map(f -> new Directories.DataDirectory(new 
org.apache.cassandra.io.util.File(f.toPath()))).collect(Collectors.toList()));
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to