This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit e1e39b074ac61f293d652b516a70c73b65c7ac3f Author: Marcus Eriksson <[email protected]> AuthorDate: Fri Nov 21 14:48:35 2025 +0100 Refactor the way we check if a transformation is allowed to be committed during upgrades Patch by marcuse and Sam Tunnicliffe; reviewed by Sam Tunnicliffe for CASSANDRA-21043 Co-authored-by: Sam Tunnicliffe <[email protected]> --- CHANGES.txt | 1 + .../config/CassandraRelevantProperties.java | 2 - .../statements/schema/AlterKeyspaceStatement.java | 7 + .../statements/schema/AlterTableStatement.java | 48 +++++++ .../cql3/statements/schema/AlterTypeStatement.java | 19 +++ .../cql3/statements/schema/AlterViewStatement.java | 7 + .../schema/CommentOnColumnStatement.java | 8 ++ .../schema/CommentOnKeyspaceStatement.java | 7 + .../statements/schema/CommentOnTableStatement.java | 7 + .../schema/CommentOnUserTypeFieldStatement.java | 7 + .../schema/CommentOnUserTypeStatement.java | 7 + .../cql3/statements/schema/CopyTableStatement.java | 7 + .../schema/CreateAggregateStatement.java | 7 + .../statements/schema/CreateFunctionStatement.java | 7 + .../statements/schema/CreateIndexStatement.java | 7 + .../statements/schema/CreateKeyspaceStatement.java | 7 + .../statements/schema/CreateTableStatement.java | 7 + .../statements/schema/CreateTriggerStatement.java | 7 + .../statements/schema/CreateTypeStatement.java | 7 + .../statements/schema/CreateViewStatement.java | 7 + .../statements/schema/DropAggregateStatement.java | 7 + .../statements/schema/DropFunctionStatement.java | 7 + .../cql3/statements/schema/DropIndexStatement.java | 7 + .../statements/schema/DropKeyspaceStatement.java | 7 + .../cql3/statements/schema/DropTableStatement.java | 6 + .../statements/schema/DropTriggerStatement.java | 7 + .../cql3/statements/schema/DropTypeStatement.java | 7 + .../cql3/statements/schema/DropViewStatement.java | 7 + .../schema/SecurityLabelOnColumnStatement.java | 6 + .../schema/SecurityLabelOnKeyspaceStatement.java | 7 + .../schema/SecurityLabelOnTableStatement.java | 7 + .../SecurityLabelOnUserTypeFieldStatement.java | 7 + .../schema/SecurityLabelOnUserTypeStatement.java | 7 + .../cassandra/io/sstable/CQLSSTableWriter.java | 17 ++- .../cassandra/schema/SchemaTransformation.java | 27 ++++ .../cassandra/schema/SchemaTransformations.java | 150 ++++++++++----------- .../cassandra/tcm/AbstractLocalProcessor.java | 9 +- .../org/apache/cassandra/tcm/Transformation.java | 140 +++++++++++-------- .../cassandra/tcm/transformations/AlterSchema.java | 5 + .../tcm/transformations/CustomTransformation.java | 6 + .../cassandra/tcm/transformations/Startup.java | 6 - .../tcm/transformations/TriggerSnapshot.java | 6 - .../tcm/transformations/cms/Initialize.java | 6 - .../distributed/test/PaxosRepairTest.java | 1 - .../distributed/test/log/CMSTestBase.java | 5 +- .../test/log/ClusterMetadataTestHelper.java | 15 ++- .../distributed/test/log/RegisterTest.java | 5 +- .../distributed/test/ring/DecommissionTest.java | 45 ------- .../operations/InsertUpdateIfConditionTest.java | 2 - .../SchemaChangeDuringRangeMovementTest.java | 5 +- .../org/apache/cassandra/schema/SchemaTest.java | 64 ++++++--- .../apache/cassandra/schema/SchemaTestUtil.java | 82 +++++++---- .../apache/cassandra/tcm/ClusterMetadataTest.java | 86 ++++++++++++ .../io/sstable/StressCQLSSTableWriter.java | 34 ++++- 54 files changed, 724 insertions(+), 264 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index f31d7c39f7..fa2b7a16c6 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 5.1 * Improve isGossipOnlyMember and location lookup performance (CASSANDRA-21039) + * Refactor the way we check if a transformation is allowed to be committed during upgrades (CASSANDRA-21043) * Improve debug around paused and disabled compaction (CASSANDRA-20131,CASSANDRA-19728) * DiskUsageBroadcaster does not update usageInfo on node replacement (CASSANDRA-21033) * Reject PrepareJoin if tokens are already assigned (CASSANDRA-21006) diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java index fc1c1c1462..99726f6b3d 100644 --- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java +++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java @@ -579,8 +579,6 @@ public enum CassandraRelevantProperties // transactional cluster metadata relevant properties // TODO: not a fan of being forced to prefix these to satisfy the alphabetic ordering constraint // but it makes sense to group logically related properties together - - TCM_ALLOW_TRANSFORMATIONS_DURING_UPGRADES("cassandra.allow_transformations_during_upgrades", "false"), /** * for testing purposes disable the automatic CMS reconfiguration after a bootstrap/replace/move operation */ diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java index 94f2504653..5ef2b68a9c 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java @@ -45,6 +45,7 @@ import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.transport.Event.SchemaChange; import org.apache.cassandra.transport.Event.SchemaChange.Change; import org.apache.cassandra.utils.FBUtilities; @@ -69,6 +70,12 @@ public final class AlterKeyspaceStatement extends AlterSchemaStatement this.ifExists = ifExists; } + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V0); + } + public Keyspaces apply(ClusterMetadata metadata) { attrs.validate(); diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java index c7a57139d1..d9a1476a0b 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java @@ -32,6 +32,7 @@ import javax.annotation.Nullable; import com.google.common.base.Splitter; import com.google.common.base.Strings; import com.google.common.collect.ImmutableSet; +import org.apache.cassandra.tcm.serialization.Version; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -176,6 +177,12 @@ public abstract class AlterTableStatement extends AlterSchemaStatement { throw ire("Altering column types is no longer supported"); } + + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V0); + } } /** @@ -211,6 +218,12 @@ public abstract class AlterTableStatement extends AlterSchemaStatement ColumnMask.ensureEnabled(); } + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V0); + } + @Override public KeyspaceMetadata apply(Epoch epoch, KeyspaceMetadata keyspace, TableMetadata table, ClusterMetadata metadata) { @@ -301,6 +314,12 @@ public abstract class AlterTableStatement extends AlterSchemaStatement newColumns.forEach(c -> c.type.validate(state, "Column " + c.name)); } + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V0); + } + public KeyspaceMetadata apply(Epoch epoch, KeyspaceMetadata keyspace, TableMetadata table, ClusterMetadata metadata) { Guardrails.alterTableEnabled.ensureEnabled("ALTER TABLE changing columns", state); @@ -461,6 +480,12 @@ public abstract class AlterTableStatement extends AlterSchemaStatement return keyspace.withSwapped(keyspace.tables.withSwapped(builder.build())); } + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V0); + } + private void dropColumn(KeyspaceMetadata keyspace, TableMetadata table, ColumnIdentifier column, boolean ifExists, TableMetadata.Builder builder) { ColumnMetadata currentColumn = table.getColumn(column); @@ -522,6 +547,12 @@ public abstract class AlterTableStatement extends AlterSchemaStatement this.ifColumnsExists = ifColumnsExists; } + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V0); + } + public KeyspaceMetadata apply(Epoch epoch, KeyspaceMetadata keyspace, TableMetadata table, ClusterMetadata metadata) { Guardrails.alterTableEnabled.ensureEnabled("ALTER TABLE changing columns", state); @@ -640,6 +671,11 @@ public abstract class AlterTableStatement extends AlterSchemaStatement return next.unbuild().transactionalMigrationFrom(newMigrateFrom).build(); } + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V0); + } public KeyspaceMetadata apply(Epoch epoch, KeyspaceMetadata keyspace, TableMetadata table, ClusterMetadata metadata) { @@ -687,6 +723,12 @@ public abstract class AlterTableStatement extends AlterSchemaStatement super(keyspaceName, tableName, ifTableExists); } + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V0); + } + public KeyspaceMetadata apply(Epoch epoch, KeyspaceMetadata keyspace, TableMetadata table, ClusterMetadata metadata) { if (!DatabaseDescriptor.enableDropCompactStorage()) @@ -816,6 +858,12 @@ public abstract class AlterTableStatement extends AlterSchemaStatement } return keyspace; } + + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V5); + } } public static final class Raw extends CQLStatement.Raw diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/AlterTypeStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/AlterTypeStatement.java index fefe70e1c6..f089605338 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/AlterTypeStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/AlterTypeStatement.java @@ -35,6 +35,7 @@ import org.apache.cassandra.schema.Keyspaces; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.transport.Event.SchemaChange; import org.apache.cassandra.transport.Event.SchemaChange.Change; import org.apache.cassandra.transport.Event.SchemaChange.Target; @@ -128,6 +129,12 @@ public abstract class AlterTypeStatement extends AlterSchemaStatement this.state = state; } + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V0); + } + UserType apply(KeyspaceMetadata keyspace, UserType userType) { if (type.isCounter()) @@ -186,6 +193,12 @@ public abstract class AlterTypeStatement extends AlterSchemaStatement this.renamedFields = renamedFields; } + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V0); + } + UserType apply(KeyspaceMetadata keyspace, UserType userType) { List<String> dependentAggregates = @@ -233,6 +246,12 @@ public abstract class AlterTypeStatement extends AlterSchemaStatement super(keyspaceName, typeName, ifExists); } + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V0); + } + UserType apply(KeyspaceMetadata keyspace, UserType userType) { throw ire("Altering field types is no longer supported"); diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/AlterViewStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/AlterViewStatement.java index 831e46b0ca..6c13f78f5a 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/AlterViewStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/AlterViewStatement.java @@ -27,6 +27,7 @@ import org.apache.cassandra.schema.*; import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.transport.Event.SchemaChange; import org.apache.cassandra.transport.Event.SchemaChange.Change; import org.apache.cassandra.transport.Event.SchemaChange.Target; @@ -55,6 +56,12 @@ public final class AlterViewStatement extends AlterSchemaStatement this.state = state; } + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V0); + } + @Override public Keyspaces apply(ClusterMetadata metadata) { diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CommentOnColumnStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CommentOnColumnStatement.java index 0bcb6088c8..329ee26e8f 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/CommentOnColumnStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/CommentOnColumnStatement.java @@ -30,6 +30,7 @@ import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.transport.Event.SchemaChange; import org.apache.cassandra.transport.Event.SchemaChange.Change; import org.apache.cassandra.transport.Event.SchemaChange.Target; @@ -63,6 +64,13 @@ public final class CommentOnColumnStatement extends SchemaDescriptionStatement this.columnName = columnName; } + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V8); + } + + @Override public Keyspaces apply(ClusterMetadata metadata) { diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CommentOnKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CommentOnKeyspaceStatement.java index 5def5af758..2d9766ea08 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/CommentOnKeyspaceStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/CommentOnKeyspaceStatement.java @@ -27,6 +27,7 @@ import org.apache.cassandra.schema.Keyspaces; import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.transport.Event.SchemaChange; import org.apache.cassandra.transport.Event.SchemaChange.Change; @@ -54,6 +55,12 @@ public final class CommentOnKeyspaceStatement extends SchemaDescriptionStatement super(keyspaceName, comment, DescriptionType.COMMENT); } + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V8); + } + @Override public Keyspaces apply(ClusterMetadata metadata) { diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CommentOnTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CommentOnTableStatement.java index 0c2ced7daf..3118533218 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/CommentOnTableStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/CommentOnTableStatement.java @@ -29,6 +29,7 @@ import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.schema.TableParams; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.transport.Event.SchemaChange; import org.apache.cassandra.transport.Event.SchemaChange.Change; import org.apache.cassandra.transport.Event.SchemaChange.Target; @@ -60,6 +61,12 @@ public final class CommentOnTableStatement extends SchemaDescriptionStatement this.tableName = tableName; } + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V8); + } + @Override public Keyspaces apply(ClusterMetadata metadata) { diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CommentOnUserTypeFieldStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CommentOnUserTypeFieldStatement.java index 049b2de7e1..d77c19d94e 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/CommentOnUserTypeFieldStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/CommentOnUserTypeFieldStatement.java @@ -29,6 +29,7 @@ import org.apache.cassandra.schema.Keyspaces; import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.transport.Event.SchemaChange; import org.apache.cassandra.transport.Event.SchemaChange.Change; import org.apache.cassandra.transport.Event.SchemaChange.Target; @@ -61,6 +62,12 @@ public final class CommentOnUserTypeFieldStatement extends SchemaDescriptionStat this.fieldName = fieldName; } + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V8); + } + @Override public Keyspaces apply(ClusterMetadata metadata) { diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CommentOnUserTypeStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CommentOnUserTypeStatement.java index c12d9918cc..9be3da5fec 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/CommentOnUserTypeStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/CommentOnUserTypeStatement.java @@ -28,6 +28,7 @@ import org.apache.cassandra.schema.Keyspaces; import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.transport.Event.SchemaChange; import org.apache.cassandra.transport.Event.SchemaChange.Change; import org.apache.cassandra.transport.Event.SchemaChange.Target; @@ -59,6 +60,12 @@ public final class CommentOnUserTypeStatement extends SchemaDescriptionStatement this.typeName = typeName; } + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V8); + } + @Override public Keyspaces apply(ClusterMetadata metadata) { diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CopyTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CopyTableStatement.java index bde06c35dc..9651c92a49 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/CopyTableStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/CopyTableStatement.java @@ -57,6 +57,7 @@ import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.ClientWarn; import org.apache.cassandra.service.reads.repair.ReadRepairStrategy; import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.transport.Event.SchemaChange; /** @@ -111,6 +112,12 @@ public final class CopyTableStatement extends AlterSchemaStatement return new AuditLogContext(AuditLogEntryType.CREATE_TABLE_LIKE, targetKeyspace, targetTableName); } + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V5); + } + @Override public Keyspaces apply(ClusterMetadata metadata) { diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CreateAggregateStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CreateAggregateStatement.java index 6428d85073..ff5fa8c02a 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/CreateAggregateStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/CreateAggregateStatement.java @@ -47,6 +47,7 @@ import org.apache.cassandra.schema.Schema; import org.apache.cassandra.serializers.MarshalException; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.transport.Event.SchemaChange; import org.apache.cassandra.transport.Event.SchemaChange.Change; import org.apache.cassandra.transport.Event.SchemaChange.Target; @@ -92,6 +93,12 @@ public final class CreateAggregateStatement extends AlterSchemaStatement this.ifNotExists = ifNotExists; } + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V0); + } + @Override public Keyspaces apply(ClusterMetadata metadata) { diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CreateFunctionStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CreateFunctionStatement.java index 20923687ff..7958d8e3e0 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/CreateFunctionStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/CreateFunctionStatement.java @@ -41,6 +41,7 @@ import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.transport.Event.SchemaChange; import org.apache.cassandra.transport.Event.SchemaChange.Change; import org.apache.cassandra.transport.Event.SchemaChange.Target; @@ -82,6 +83,12 @@ public final class CreateFunctionStatement extends AlterSchemaStatement this.ifNotExists = ifNotExists; } + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V0); + } + // TODO: replace affected aggregates !! public Keyspaces apply(ClusterMetadata metadata) { diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java index addf9b68ef..f36ec54262 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/CreateIndexStatement.java @@ -41,6 +41,7 @@ import org.apache.cassandra.schema.*; import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.transport.Event.SchemaChange; import org.apache.cassandra.transport.Event.SchemaChange.Change; import org.apache.cassandra.transport.Event.SchemaChange.Target; @@ -122,6 +123,12 @@ public final class CreateIndexStatement extends AlterSchemaStatement this.state = state; } + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V0); + } + @Override public Keyspaces apply(ClusterMetadata metadata) { diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CreateKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CreateKeyspaceStatement.java index 1df860d123..a035da52da 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/CreateKeyspaceStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/CreateKeyspaceStatement.java @@ -37,6 +37,7 @@ import org.apache.cassandra.schema.KeyspaceParams.Option; import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.transport.Event.SchemaChange; import org.apache.cassandra.transport.Event.SchemaChange.Change; @@ -61,6 +62,12 @@ public final class CreateKeyspaceStatement extends AlterSchemaStatement return super.cql(); } + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V0); + } + @Override public Keyspaces apply(ClusterMetadata metadata) { diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CreateTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CreateTableStatement.java index e452d36e11..328cb7542e 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/CreateTableStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/CreateTableStatement.java @@ -73,6 +73,7 @@ import org.apache.cassandra.schema.UserFunctions; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.reads.repair.ReadRepairStrategy; import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.transport.Event.SchemaChange; import org.apache.cassandra.transport.Event.SchemaChange.Change; import org.apache.cassandra.transport.Event.SchemaChange.Target; @@ -135,6 +136,12 @@ public final class CreateTableStatement extends AlterSchemaStatement return super.cql(); } + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V0); + } + public Keyspaces apply(ClusterMetadata metadata) { Keyspaces schema = metadata.schema.getKeyspaces(); diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CreateTriggerStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CreateTriggerStatement.java index 5441cb8952..ccbd4852eb 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/CreateTriggerStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/CreateTriggerStatement.java @@ -29,6 +29,7 @@ import org.apache.cassandra.schema.*; import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.triggers.TriggerExecutor; import org.apache.cassandra.transport.Event.SchemaChange; import org.apache.cassandra.transport.Event.SchemaChange.Change; @@ -66,6 +67,12 @@ public final class CreateTriggerStatement extends AlterSchemaStatement } } + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V0); + } + @Override public Keyspaces apply(ClusterMetadata metadata) { diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CreateTypeStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CreateTypeStatement.java index bd63310f18..375e7e832e 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/CreateTypeStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/CreateTypeStatement.java @@ -35,6 +35,7 @@ import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff; import org.apache.cassandra.schema.Types; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.transport.Event.SchemaChange; import org.apache.cassandra.transport.Event.SchemaChange.Change; import org.apache.cassandra.transport.Event.SchemaChange.Target; @@ -76,6 +77,12 @@ public final class CreateTypeStatement extends AlterSchemaStatement } } + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V0); + } + public Keyspaces apply(ClusterMetadata metadata) { Keyspaces schema = metadata.schema.getKeyspaces(); diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CreateViewStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CreateViewStatement.java index 4a4d903a33..3b204b236c 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/CreateViewStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/CreateViewStatement.java @@ -58,6 +58,7 @@ import org.apache.cassandra.schema.TableParams; import org.apache.cassandra.schema.ViewMetadata; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.transport.Event.SchemaChange; import org.apache.cassandra.transport.Event.SchemaChange.Change; import org.apache.cassandra.transport.Event.SchemaChange.Target; @@ -129,6 +130,12 @@ public final class CreateViewStatement extends AlterSchemaStatement this.state = state; } + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V0); + } + @Override public Keyspaces apply(ClusterMetadata metadata) { diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/DropAggregateStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/DropAggregateStatement.java index e61332ee12..16beede039 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/DropAggregateStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/DropAggregateStatement.java @@ -36,6 +36,7 @@ import org.apache.cassandra.schema.*; import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.transport.Event.SchemaChange; import org.apache.cassandra.transport.Event.SchemaChange.Change; @@ -65,6 +66,12 @@ public final class DropAggregateStatement extends AlterSchemaStatement this.ifExists = ifExists; } + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V0); + } + public Keyspaces apply(ClusterMetadata metadata) { String name = diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/DropFunctionStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/DropFunctionStatement.java index 0353964683..c157e42107 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/DropFunctionStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/DropFunctionStatement.java @@ -36,6 +36,7 @@ import org.apache.cassandra.schema.*; import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.transport.Event.SchemaChange; import org.apache.cassandra.transport.Event.SchemaChange.Change; @@ -66,6 +67,12 @@ public final class DropFunctionStatement extends AlterSchemaStatement this.ifExists = ifExists; } + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V0); + } + @Override public Keyspaces apply(ClusterMetadata metadata) { diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/DropIndexStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/DropIndexStatement.java index 06cbc4be99..0fde7c8fc9 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/DropIndexStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/DropIndexStatement.java @@ -27,6 +27,7 @@ import org.apache.cassandra.schema.KeyspaceMetadata.KeyspaceDiff; import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.transport.Event.SchemaChange; import org.apache.cassandra.transport.Event.SchemaChange.Change; import org.apache.cassandra.transport.Event.SchemaChange.Target; @@ -43,6 +44,12 @@ public final class DropIndexStatement extends AlterSchemaStatement this.ifExists = ifExists; } + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V0); + } + @Override public Keyspaces apply(ClusterMetadata metadata) { diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/DropKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/DropKeyspaceStatement.java index 0ab0447f6b..744c768010 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/DropKeyspaceStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/DropKeyspaceStatement.java @@ -31,6 +31,7 @@ import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.transport.Event.SchemaChange; import org.apache.cassandra.transport.Event.SchemaChange.Change; @@ -44,6 +45,12 @@ public final class DropKeyspaceStatement extends AlterSchemaStatement this.ifExists = ifExists; } + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V0); + } + @Override public Keyspaces apply(ClusterMetadata metadata) { diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/DropTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/DropTableStatement.java index c9e6516c57..79b416ad26 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/DropTableStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/DropTableStatement.java @@ -30,6 +30,7 @@ import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.tcm.sequences.DropAccordTable.TableReference; import org.apache.cassandra.tcm.sequences.InProgressSequences; +import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.tcm.transformations.PrepareDropAccordTable; import org.apache.cassandra.transport.Event.SchemaChange; import org.apache.cassandra.transport.Event.SchemaChange.Change; @@ -72,6 +73,11 @@ public final class DropTableStatement extends AlterSchemaStatement return InProgressSequences.finishInProgressSequences(ref); } + public boolean compatibleWith(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V0); + } + public Keyspaces apply(ClusterMetadata metadata) { Guardrails.dropTruncateTableEnabled.ensureEnabled(state); diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/DropTriggerStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/DropTriggerStatement.java index 15007c3b1e..77d003b9e5 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/DropTriggerStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/DropTriggerStatement.java @@ -25,6 +25,7 @@ import org.apache.cassandra.schema.*; import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.transport.Event.SchemaChange; import org.apache.cassandra.transport.Event.SchemaChange.Change; import org.apache.cassandra.transport.Event.SchemaChange.Target; @@ -43,6 +44,12 @@ public final class DropTriggerStatement extends AlterSchemaStatement this.ifExists = ifExists; } + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V0); + } + @Override public Keyspaces apply(ClusterMetadata metadata) { diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/DropTypeStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/DropTypeStatement.java index 105c8f5db8..ba904e4370 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/DropTypeStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/DropTypeStatement.java @@ -32,6 +32,7 @@ import org.apache.cassandra.schema.Keyspaces; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.transport.Event.SchemaChange.Change; import org.apache.cassandra.transport.Event.SchemaChange.Target; import org.apache.cassandra.transport.Event.SchemaChange; @@ -55,6 +56,12 @@ public final class DropTypeStatement extends AlterSchemaStatement this.ifExists = ifExists; } + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V0); + } + // TODO: expand types into tuples in all dropped columns of all tables @Override public Keyspaces apply(ClusterMetadata metadata) diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/DropViewStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/DropViewStatement.java index 121575503c..0c07dc7907 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/DropViewStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/DropViewStatement.java @@ -26,6 +26,7 @@ import org.apache.cassandra.schema.*; import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.transport.Event.SchemaChange; import org.apache.cassandra.transport.Event.SchemaChange.Change; import org.apache.cassandra.transport.Event.SchemaChange.Target; @@ -42,6 +43,12 @@ public final class DropViewStatement extends AlterSchemaStatement this.ifExists = ifExists; } + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V0); + } + @Override public Keyspaces apply(ClusterMetadata metadata) { diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/SecurityLabelOnColumnStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/SecurityLabelOnColumnStatement.java index 22cb81e6e9..9724f336a8 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/SecurityLabelOnColumnStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/SecurityLabelOnColumnStatement.java @@ -30,6 +30,7 @@ import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.transport.Event.SchemaChange; import org.apache.cassandra.transport.Event.SchemaChange.Change; import org.apache.cassandra.transport.Event.SchemaChange.Target; @@ -88,6 +89,11 @@ public final class SecurityLabelOnColumnStatement extends SchemaDescriptionState this.provider = provider; } + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V8); + } @Override public Keyspaces apply(ClusterMetadata metadata) diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/SecurityLabelOnKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/SecurityLabelOnKeyspaceStatement.java index e9ab8625d1..1fa2051bbc 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/SecurityLabelOnKeyspaceStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/SecurityLabelOnKeyspaceStatement.java @@ -27,6 +27,7 @@ import org.apache.cassandra.schema.Keyspaces; import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.transport.Event.SchemaChange; import org.apache.cassandra.transport.Event.SchemaChange.Change; @@ -80,6 +81,12 @@ public final class SecurityLabelOnKeyspaceStatement extends SchemaDescriptionSta this.provider = provider; } + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V8); + } + @Override public Keyspaces apply(ClusterMetadata metadata) { diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/SecurityLabelOnTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/SecurityLabelOnTableStatement.java index 969014c8ab..9640640be2 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/SecurityLabelOnTableStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/SecurityLabelOnTableStatement.java @@ -29,6 +29,7 @@ import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.schema.TableParams; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.transport.Event.SchemaChange; import org.apache.cassandra.transport.Event.SchemaChange.Change; import org.apache.cassandra.transport.Event.SchemaChange.Target; @@ -85,6 +86,12 @@ public final class SecurityLabelOnTableStatement extends SchemaDescriptionStatem this.provider = provider; } + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V8); + } + @Override public Keyspaces apply(ClusterMetadata metadata) { diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/SecurityLabelOnUserTypeFieldStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/SecurityLabelOnUserTypeFieldStatement.java index fee6fa6b2f..e4aca100d5 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/SecurityLabelOnUserTypeFieldStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/SecurityLabelOnUserTypeFieldStatement.java @@ -30,6 +30,7 @@ import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.ClientWarn; import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.transport.Event.SchemaChange; import org.apache.cassandra.transport.Event.SchemaChange.Change; import org.apache.cassandra.transport.Event.SchemaChange.Target; @@ -87,6 +88,12 @@ public final class SecurityLabelOnUserTypeFieldStatement extends SchemaDescripti this.provider = provider; } + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V8); + } + @Override public Keyspaces apply(ClusterMetadata metadata) { diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/SecurityLabelOnUserTypeStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/SecurityLabelOnUserTypeStatement.java index cd30e456a3..0a4d3ee4e9 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/SecurityLabelOnUserTypeStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/SecurityLabelOnUserTypeStatement.java @@ -28,6 +28,7 @@ import org.apache.cassandra.schema.Keyspaces; import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.transport.Event.SchemaChange; import org.apache.cassandra.transport.Event.SchemaChange.Change; import org.apache.cassandra.transport.Event.SchemaChange.Target; @@ -84,6 +85,12 @@ public final class SecurityLabelOnUserTypeStatement extends SchemaDescriptionSta this.provider = provider; } + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V8); + } + @Override public Keyspaces apply(ClusterMetadata metadata) { diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java index c1043a9fb7..8236451eb8 100644 --- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java @@ -84,6 +84,7 @@ import org.apache.cassandra.schema.Views; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.ClusterMetadataService; +import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.tcm.transformations.AlterSchema; import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.utils.ByteBufferUtil; @@ -857,7 +858,21 @@ public class CQLSSTableWriter implements Closeable private void commitKeyspaceMetadata(KeyspaceMetadata keyspaceMetadata) { - SchemaTransformation schemaTransformation = metadata -> metadata.schema.getKeyspaces().withAddedOrUpdated(keyspaceMetadata); + SchemaTransformation schemaTransformation = new SchemaTransformation() + { + @Override + public Keyspaces apply(ClusterMetadata metadata) + { + return metadata.schema.getKeyspaces().withAddedOrUpdated(keyspaceMetadata); + } + + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V0); + } + }; + ClusterMetadataService.instance().commit(new AlterSchema(schemaTransformation)); } diff --git a/src/java/org/apache/cassandra/schema/SchemaTransformation.java b/src/java/org/apache/cassandra/schema/SchemaTransformation.java index 75ab3767ab..38b66434bb 100644 --- a/src/java/org/apache/cassandra/schema/SchemaTransformation.java +++ b/src/java/org/apache/cassandra/schema/SchemaTransformation.java @@ -46,6 +46,33 @@ public interface SchemaTransformation */ Keyspaces apply(ClusterMetadata metadata); + /** + * Should return true if the implementing schema transformation is compatible with the given cluster metadata. + * Specifically, it should use the Directory to ensure that all current members are capable of both deserializing + * and enacting the transformation. If this returns false, the {@link org.apache.cassandra.tcm.log.Entry} containing + * the schema transformation will be blocked from being committed to the metadata log. + * + * For example, if an entirely new schema transformation is introduced, a new metadata serialization version + * (see {@link Version}) must be added and the implementation of {@code compatibleWith} in that schema + * transformation must return true only if the {@code commonSerializationVersion} of + * {@link ClusterMetadata#directory} is at least equal to the new {@code Version}. + * + * Similar constraints apply if a new feature is added to an existing transformation. Depending on the specifics of + * the feature implementation, a new serialization {@link Version} may not be required but the transformation should + * be able to signal that a minimum Cassandra version is required before the transformation can be successfully + * performed. + * An illustraton of this scenario could be adding a new compression implementation in a minor release. No + * serialization changes are required as compression params are represented by a simple {@code Map<String,String>} + * but if a table is created or altered to use a new compression class during the minor upgrade, the unupgraded + * nodes may not be able to enact the transformation, requiring them to complete the upgrade synchronously. To + * mitigate this, the {@code compatibleWith} implementation in + * {@link org.apache.cassandra.cql3.statements.schema.CreateTableStatement} and + * {@link org.apache.cassandra.cql3.statements.schema.AlterTableStatement} would inspect the + * {@link org.apache.cassandra.utils.CassandraVersion} of {@code clusterMinVersion} provided by + * {@link ClusterMetadata#directory} to ensure all members are sufficiently capable. + */ + boolean compatibleWith(ClusterMetadata metadata); + default String cql() { return "null"; diff --git a/src/java/org/apache/cassandra/schema/SchemaTransformations.java b/src/java/org/apache/cassandra/schema/SchemaTransformations.java index c6996199e5..f0b9fe6105 100644 --- a/src/java/org/apache/cassandra/schema/SchemaTransformations.java +++ b/src/java/org/apache/cassandra/schema/SchemaTransformations.java @@ -23,10 +23,8 @@ import java.util.Optional; import org.apache.cassandra.db.marshal.UserType; import org.apache.cassandra.exceptions.AlreadyExistsException; import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.cql3.CQLStatement; -import org.apache.cassandra.cql3.QueryProcessor; -import org.apache.cassandra.service.ClientState; import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.serialization.Version; import static org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest; @@ -35,14 +33,6 @@ import static org.apache.cassandra.cql3.statements.RequestValidations.invalidReq */ public class SchemaTransformations { - public static SchemaTransformation fromCql(String cql) - { - CQLStatement statement = QueryProcessor.getStatement(cql, ClientState.forInternalCalls()); - if (!(statement instanceof SchemaTransformation)) - throw new IllegalArgumentException("Can not deserialize schema transformation"); - return (SchemaTransformation) statement; - } - /** * Creates a schema transformation that adds the provided keyspace. * @@ -54,19 +44,29 @@ public class SchemaTransformations */ public static SchemaTransformation addKeyspace(KeyspaceMetadata keyspace, boolean ignoreIfExists) { - return (metadata) -> + return new SchemaTransformation() { - Keyspaces schema = metadata.schema.getKeyspaces(); - KeyspaceMetadata existing = schema.getNullable(keyspace.name); - if (existing != null) + @Override + public Keyspaces apply(ClusterMetadata metadata) { - if (ignoreIfExists) - return schema; + Keyspaces schema = metadata.schema.getKeyspaces(); + KeyspaceMetadata existing = schema.getNullable(keyspace.name); + if (existing != null) + { + if (ignoreIfExists) + return schema; + + throw new AlreadyExistsException(keyspace.name); + } - throw new AlreadyExistsException(keyspace.name); + return schema.withAddedOrUpdated(keyspace); } - return schema.withAddedOrUpdated(keyspace); + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V0); + } }; } @@ -81,84 +81,74 @@ public class SchemaTransformations */ public static SchemaTransformation addTable(TableMetadata table, boolean ignoreIfExists) { - return (metadata) -> + return new SchemaTransformation() { - Keyspaces schema = metadata.schema.getKeyspaces(); - KeyspaceMetadata keyspace = schema.getNullable(table.keyspace); - if (keyspace == null) - throw invalidRequest("Keyspace '%s' doesn't exist", table.keyspace); - - if (keyspace.hasTable(table.name)) + @Override + public Keyspaces apply(ClusterMetadata metadata) { - if (ignoreIfExists) - return schema; + Keyspaces schema = metadata.schema.getKeyspaces(); + KeyspaceMetadata keyspace = schema.getNullable(table.keyspace); + if (keyspace == null) + throw invalidRequest("Keyspace '%s' doesn't exist", table.keyspace); - throw new AlreadyExistsException(table.keyspace, table.name); - } + if (keyspace.hasTable(table.name)) + { + if (ignoreIfExists) + return schema; - table.validate(); + throw new AlreadyExistsException(table.keyspace, table.name); + } + + table.validate(); + + return schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.tables.with(table))); + } - return schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.tables.with(table))); + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V0); + } }; } public static SchemaTransformation addTypes(Types toAdd, boolean ignoreIfExists) { - return (metadata) -> + return new SchemaTransformation() { - Keyspaces schema = metadata.schema.getKeyspaces(); - if (toAdd.isEmpty()) - return schema; + @Override + public Keyspaces apply(ClusterMetadata metadata) + { + Keyspaces schema = metadata.schema.getKeyspaces(); + if (toAdd.isEmpty()) + return schema; - String keyspaceName = toAdd.iterator().next().keyspace; - KeyspaceMetadata keyspace = schema.getNullable(keyspaceName); - if (null == keyspace) - throw invalidRequest("Keyspace '%s' doesn't exist", keyspaceName); + String keyspaceName = toAdd.iterator().next().keyspace; + KeyspaceMetadata keyspace = schema.getNullable(keyspaceName); + if (null == keyspace) + throw invalidRequest("Keyspace '%s' doesn't exist", keyspaceName); - Types types = keyspace.types; - for (UserType type : toAdd) - { - if (types.containsType(type.name)) + Types types = keyspace.types; + for (UserType type : toAdd) { - if (ignoreIfExists) - continue; + if (types.containsType(type.name)) + { + if (ignoreIfExists) + continue; - throw new ConfigurationException("Type " + type + " already exists in " + keyspaceName); - } + throw new ConfigurationException("Type " + type + " already exists in " + keyspaceName); + } - types = types.with(type); + types = types.with(type); + } + return schema.withAddedOrReplaced(keyspace.withSwapped(types)); } - return schema.withAddedOrReplaced(keyspace.withSwapped(types)); - }; - } - - /** - * Creates a schema transformation that adds the provided view. - * - * @param view the view to add. - * @param ignoreIfExists if {@code true}, the transformation is a no-op if a view of the same name than - * {@code view} already exists in the schema the transformation is applied on. Otherwise, - * the transformation throws an {@link AlreadyExistsException} in that case. - * @return the created transformation. - */ - public static SchemaTransformation addView(ViewMetadata view, boolean ignoreIfExists) - { - return (metadata) -> - { - Keyspaces schema = metadata.schema.getKeyspaces(); - KeyspaceMetadata keyspace = schema.getNullable(view.keyspace()); - if (keyspace == null) - throw invalidRequest("Cannot add view to non existing keyspace '%s'", view.keyspace()); - if (keyspace.hasView(view.name())) + @Override + public boolean compatibleWith(ClusterMetadata metadata) { - if (ignoreIfExists) - return schema; - - throw new AlreadyExistsException(view.keyspace(), view.name()); + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V0); } - - return schema.withAddedOrUpdated(keyspace.withSwapped(keyspace.views.with(view))); }; } @@ -184,6 +174,12 @@ public class SchemaTransformations return Optional.of(generation); } + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V0); + } + @Override public Keyspaces apply(ClusterMetadata metadata) { diff --git a/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java b/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java index 72d46a5af5..0f42bae462 100644 --- a/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java +++ b/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java @@ -23,7 +23,6 @@ import java.util.function.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.tcm.log.Entry; import org.apache.cassandra.tcm.log.LocalLog; import org.apache.cassandra.tcm.log.LogState; @@ -65,11 +64,11 @@ public abstract class AbstractLocalProcessor implements Processor } Transformation.Result result; - if (!CassandraRelevantProperties.TCM_ALLOW_TRANSFORMATIONS_DURING_UPGRADES.getBoolean() && - !transform.allowDuringUpgrades() && - previous.metadataSerializationUpgradeInProgress()) + if (!transform.eligibleToCommit(previous)) { - result = new Transformation.Rejected(INVALID, "Upgrade in progress, can't commit " + transform); + result = new Transformation.Rejected(INVALID, "Transformation rejected, can't commit " + transform + + " it not supported with cluster common serialization version " + previous.directory.commonSerializationVersion + + " and min/max serialization versions " + previous.directory.clusterMinVersion + "/" + previous.directory.clusterMaxVersion); } else { diff --git a/src/java/org/apache/cassandra/tcm/Transformation.java b/src/java/org/apache/cassandra/tcm/Transformation.java index fc519e6edb..200d19e990 100644 --- a/src/java/org/apache/cassandra/tcm/Transformation.java +++ b/src/java/org/apache/cassandra/tcm/Transformation.java @@ -69,6 +69,8 @@ import org.apache.cassandra.tcm.transformations.cms.RemoveFromCMS; import org.apache.cassandra.tcm.transformations.cms.StartAddToCMS; import org.apache.cassandra.tcm.transformations.cms.PrepareCMSReconfiguration; +import static org.apache.cassandra.tcm.serialization.Version.V0; + /** * Implementations should be pure transformations from one ClusterMetadata state to another. They are likely to be * replayed during startup to rebuild the node's current state and so should be free of side effects and should not @@ -91,9 +93,28 @@ public interface Transformation */ Result execute(ClusterMetadata metadata); - default boolean allowDuringUpgrades() + /** + * Returns true if the type of Transformation (as identified by its KIND) is eligible to be committed to the + * metadata log given the state of the supplied ClusterMetadata. This is to ensure that newly introduced + * transformations can not be added to the log until all current members are able to read and enact them. + * + * The default implementation works in conjunction with {@link Kind#introducedIn}. It checks whether the lowest + * common serialization version in the cluster is at least equal to the minimum version required to deserialize + * transformations of that Kind. + * + * Specific Transformation implementations are free to consider other factors if they override this method. For + * example, they may inspect the Cassandra release versions as well as or instead of supported serialization + * versions. + * + * Note: this check pertains only to the KIND of Transformation. Even if the Kind is eligible to be applied, a + * specific instance of that kind may still be rejected based on its parameters and/or current cluster state. + * e.g. Transforms with a Kind of PREPARE_JOIN are eligible to be committed as their introducedIn() implementation + * returns Version.V0, but a specific PrepareJoin may still return a Rejected result if the target node is already + * in a JOINED state. + */ + default boolean eligibleToCommit(ClusterMetadata metadata) { - return false; + return kind().introducedIn.isEqualOrBefore(metadata.directory.commonSerializationVersion); } static Success success(ClusterMetadata.Transformer transformer, LockedRanges.AffectedRanges affectedRanges) @@ -194,69 +215,81 @@ public interface Transformation enum Kind { - PRE_INITIALIZE_CMS(0, () -> PreInitialize.serializer), - INITIALIZE_CMS(1, () -> Initialize.serializer), - FORCE_SNAPSHOT(2, () -> ForceSnapshot.serializer), - TRIGGER_SNAPSHOT(3, () -> TriggerSnapshot.serializer), - SCHEMA_CHANGE(4, () -> AlterSchema.serializer), - REGISTER(5, () -> Register.serializer), - UNREGISTER(6, () -> Unregister.serializer), - - UNSAFE_JOIN(7, () -> UnsafeJoin.serializer), - PREPARE_JOIN(8, () -> PrepareJoin.serializer), - START_JOIN(9, () -> PrepareJoin.StartJoin.serializer), - MID_JOIN(10, () -> PrepareJoin.MidJoin.serializer), - FINISH_JOIN(11, () -> PrepareJoin.FinishJoin.serializer), - - PREPARE_MOVE(12, () -> PrepareMove.serializer), - START_MOVE(13, () -> PrepareMove.StartMove.serializer), - MID_MOVE(14, () -> PrepareMove.MidMove.serializer), - FINISH_MOVE(15, () -> PrepareMove.FinishMove.serializer), - - PREPARE_LEAVE(16, () -> PrepareLeave.serializer), - START_LEAVE(17, () -> PrepareLeave.StartLeave.serializer), - MID_LEAVE(18, () -> PrepareLeave.MidLeave.serializer), - FINISH_LEAVE(19, () -> PrepareLeave.FinishLeave.serializer), - ASSASSINATE(20, () -> Assassinate.serializer), - - PREPARE_REPLACE(21, () -> PrepareReplace.serializer), - START_REPLACE(22, () -> PrepareReplace.StartReplace.serializer), - MID_REPLACE(23, () -> PrepareReplace.MidReplace.serializer), - FINISH_REPLACE(24, () -> PrepareReplace.FinishReplace.serializer), - - CANCEL_SEQUENCE(25, () -> CancelInProgressSequence.serializer), + PRE_INITIALIZE_CMS(0, V0, () -> PreInitialize.serializer), + INITIALIZE_CMS(1, V0, () -> Initialize.serializer), + FORCE_SNAPSHOT(2, V0, () -> ForceSnapshot.serializer), + TRIGGER_SNAPSHOT(3, V0, () -> TriggerSnapshot.serializer), + SCHEMA_CHANGE(4, V0, () -> AlterSchema.serializer), + REGISTER(5, V0, () -> Register.serializer), + UNREGISTER(6, V0, () -> Unregister.serializer), + + UNSAFE_JOIN(7, V0, () -> UnsafeJoin.serializer), + PREPARE_JOIN(8, V0, () -> PrepareJoin.serializer), + START_JOIN(9, V0, () -> PrepareJoin.StartJoin.serializer), + MID_JOIN(10, V0, () -> PrepareJoin.MidJoin.serializer), + FINISH_JOIN(11, V0, () -> PrepareJoin.FinishJoin.serializer), + + PREPARE_MOVE(12, V0, () -> PrepareMove.serializer), + START_MOVE(13, V0, () -> PrepareMove.StartMove.serializer), + MID_MOVE(14, V0, () -> PrepareMove.MidMove.serializer), + FINISH_MOVE(15, V0, () -> PrepareMove.FinishMove.serializer), + + PREPARE_LEAVE(16, V0, () -> PrepareLeave.serializer), + START_LEAVE(17, V0, () -> PrepareLeave.StartLeave.serializer), + MID_LEAVE(18, V0, () -> PrepareLeave.MidLeave.serializer), + FINISH_LEAVE(19, V0, () -> PrepareLeave.FinishLeave.serializer), + ASSASSINATE(20, V0, () -> Assassinate.serializer), + + PREPARE_REPLACE(21, V0, () -> PrepareReplace.serializer), + START_REPLACE(22, V0, () -> PrepareReplace.StartReplace.serializer), + MID_REPLACE(23, V0, () -> PrepareReplace.MidReplace.serializer), + FINISH_REPLACE(24, V0, () -> PrepareReplace.FinishReplace.serializer), + + CANCEL_SEQUENCE(25, V0, () -> CancelInProgressSequence.serializer), @Deprecated(since = "CEP-21") - START_ADD_TO_CMS(26, () -> StartAddToCMS.serializer), + START_ADD_TO_CMS(26, V0, () -> StartAddToCMS.serializer), @Deprecated(since = "CEP-21") - FINISH_ADD_TO_CMS(27, () -> FinishAddToCMS.serializer), + FINISH_ADD_TO_CMS(27, V0, () -> FinishAddToCMS.serializer), @Deprecated(since = "CEP-21") - REMOVE_FROM_CMS(28, () -> RemoveFromCMS.serializer), + REMOVE_FROM_CMS(28, V0, () -> RemoveFromCMS.serializer), - STARTUP(29, () -> Startup.serializer), + STARTUP(29, V0, () -> Startup.serializer), - CUSTOM(30, () -> CustomTransformation.serializer), + CUSTOM(30, V0, () -> CustomTransformation.serializer), - PREPARE_SIMPLE_CMS_RECONFIGURATION(31, () -> PrepareCMSReconfiguration.Simple.serializer), - PREPARE_COMPLEX_CMS_RECONFIGURATION(32, () -> PrepareCMSReconfiguration.Complex.serializer), - ADVANCE_CMS_RECONFIGURATION(33, () -> AdvanceCMSReconfiguration.serializer), - CANCEL_CMS_RECONFIGURATION(34, () -> CancelCMSReconfiguration.serializer), - ALTER_TOPOLOGY(35, () -> AlterTopology.serializer), + PREPARE_SIMPLE_CMS_RECONFIGURATION(31, V0, () -> PrepareCMSReconfiguration.Simple.serializer), + PREPARE_COMPLEX_CMS_RECONFIGURATION(32, V0, () -> PrepareCMSReconfiguration.Complex.serializer), + ADVANCE_CMS_RECONFIGURATION(33, V0, () -> AdvanceCMSReconfiguration.serializer), + CANCEL_CMS_RECONFIGURATION(34, V0, () -> CancelCMSReconfiguration.serializer), + ALTER_TOPOLOGY(35, V0, () -> AlterTopology.serializer), - UPDATE_AVAILABILITY(36, () -> ReconfigureAccordFastPath.serializer), + UPDATE_AVAILABILITY(36, Version.MIN_ACCORD_VERSION, () -> ReconfigureAccordFastPath.serializer), - BEGIN_CONSENSUS_MIGRATION_FOR_TABLE_AND_RANGE(37, () -> BeginConsensusMigrationForTableAndRange.serializer), - MAYBE_FINISH_CONSENSUS_MIGRATION_FOR_TABLE_AND_RANGE(38, () -> MaybeFinishConsensusMigrationForTableAndRange.serializer), - ACCORD_MARK_STALE(39, () -> AccordMarkStale.serializer), - ACCORD_MARK_REJOINING(40, () -> AccordMarkRejoining.serializer), - PREPARE_DROP_ACCORD_TABLE(41, () -> PrepareDropAccordTable.serializer), - FINISH_DROP_ACCORD_TABLE(42, () -> FinishDropAccordTable.serializer), - ACCORD_MARK_HARD_REMOVED(43, () -> AccordMarkHardRemoved.serializer), + BEGIN_CONSENSUS_MIGRATION_FOR_TABLE_AND_RANGE(37, Version.MIN_ACCORD_VERSION, () -> BeginConsensusMigrationForTableAndRange.serializer), + MAYBE_FINISH_CONSENSUS_MIGRATION_FOR_TABLE_AND_RANGE(38, Version.MIN_ACCORD_VERSION, () -> MaybeFinishConsensusMigrationForTableAndRange.serializer), + ACCORD_MARK_STALE(39, Version.MIN_ACCORD_VERSION, () -> AccordMarkStale.serializer), + ACCORD_MARK_REJOINING(40, Version.MIN_ACCORD_VERSION, () -> AccordMarkRejoining.serializer), + PREPARE_DROP_ACCORD_TABLE(41, Version.MIN_ACCORD_VERSION, () -> PrepareDropAccordTable.serializer), + FINISH_DROP_ACCORD_TABLE(42, Version.MIN_ACCORD_VERSION, () -> FinishDropAccordTable.serializer), + ACCORD_MARK_HARD_REMOVED(43, Version.MIN_ACCORD_VERSION, () -> AccordMarkHardRemoved.serializer), ; - private final Supplier<AsymmetricMetadataSerializer<Transformation, ? extends Transformation>> serializer; + /** + * The metadata serialization version which was current (as defined by + * {@link org.apache.cassandra.tcm.membership.NodeVersion#CURRENT_METADATA_VERSION}) when a transformation was + * first introduced. During an upgrade this is used to prevent new transformations being committed by upgraded + * nodes as nodes still running the older version will not be able to deserialize them. + * + * This should generally always be left as the first version the transformation was added and not be bumped when + * the implementation is modified. Adding Version guards to the associated {@link AsymmetricMetadataSerializer} + * is the way to handle this type of evolution. + */ + public final Version introducedIn; public final int id; + private final Supplier<AsymmetricMetadataSerializer<Transformation, ? extends Transformation>> serializer; + private static final Kind[] idToKindMap; static @@ -271,10 +304,11 @@ public interface Transformation idToKindMap = idMap; } - Kind(int id, Supplier<AsymmetricMetadataSerializer<Transformation, ? extends Transformation>> serializer) + Kind(int id, Version introducedIn, Supplier<AsymmetricMetadataSerializer<Transformation, ? extends Transformation>> serializer) { this.serializer = serializer; this.id = id; + this.introducedIn = introducedIn; } public static Kind fromId(int id) diff --git a/src/java/org/apache/cassandra/tcm/transformations/AlterSchema.java b/src/java/org/apache/cassandra/tcm/transformations/AlterSchema.java index 09c5b68bb5..f2a8389c60 100644 --- a/src/java/org/apache/cassandra/tcm/transformations/AlterSchema.java +++ b/src/java/org/apache/cassandra/tcm/transformations/AlterSchema.java @@ -98,6 +98,11 @@ public class AlterSchema implements Transformation return Kind.SCHEMA_CHANGE; } + public boolean eligibleToCommit(ClusterMetadata metadata) + { + return schemaTransformation.compatibleWith(metadata); + } + @Override public final Result execute(ClusterMetadata prev) { diff --git a/src/java/org/apache/cassandra/tcm/transformations/CustomTransformation.java b/src/java/org/apache/cassandra/tcm/transformations/CustomTransformation.java index bcacba652d..bbe09699d5 100644 --- a/src/java/org/apache/cassandra/tcm/transformations/CustomTransformation.java +++ b/src/java/org/apache/cassandra/tcm/transformations/CustomTransformation.java @@ -109,6 +109,12 @@ public class CustomTransformation implements Transformation return Kind.CUSTOM; } + @Override + public boolean eligibleToCommit(ClusterMetadata metadata) + { + return child.eligibleToCommit(metadata); + } + public Result execute(ClusterMetadata prev) { return child.execute(prev); diff --git a/src/java/org/apache/cassandra/tcm/transformations/Startup.java b/src/java/org/apache/cassandra/tcm/transformations/Startup.java index 7a2ecb34c5..a1abfdbff4 100644 --- a/src/java/org/apache/cassandra/tcm/transformations/Startup.java +++ b/src/java/org/apache/cassandra/tcm/transformations/Startup.java @@ -126,12 +126,6 @@ public class Startup implements Transformation '}'; } - @Override - public boolean allowDuringUpgrades() - { - return true; - } - public static void maybeExecuteStartupTransformation(NodeId localNodeId) { Directory directory = ClusterMetadata.current().directory; diff --git a/src/java/org/apache/cassandra/tcm/transformations/TriggerSnapshot.java b/src/java/org/apache/cassandra/tcm/transformations/TriggerSnapshot.java index 6c53a6618b..98f0f94f27 100644 --- a/src/java/org/apache/cassandra/tcm/transformations/TriggerSnapshot.java +++ b/src/java/org/apache/cassandra/tcm/transformations/TriggerSnapshot.java @@ -48,12 +48,6 @@ public class TriggerSnapshot implements Transformation return Kind.TRIGGER_SNAPSHOT; } - @Override - public boolean allowDuringUpgrades() - { - return true; - } - @Override public Result execute(ClusterMetadata prev) { diff --git a/src/java/org/apache/cassandra/tcm/transformations/cms/Initialize.java b/src/java/org/apache/cassandra/tcm/transformations/cms/Initialize.java index 9564372c6f..11c2ed4d31 100644 --- a/src/java/org/apache/cassandra/tcm/transformations/cms/Initialize.java +++ b/src/java/org/apache/cassandra/tcm/transformations/cms/Initialize.java @@ -61,12 +61,6 @@ public class Initialize extends ForceSnapshot super(baseState); } - @Override - public boolean allowDuringUpgrades() - { - return true; - } - public Kind kind() { return Kind.INITIALIZE_CMS; diff --git a/test/distributed/org/apache/cassandra/distributed/test/PaxosRepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/PaxosRepairTest.java index d18f3a6f06..ad750b0b7d 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/PaxosRepairTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/PaxosRepairTest.java @@ -123,7 +123,6 @@ public class PaxosRepairTest extends TestBaseImpl { CassandraRelevantProperties.PAXOS_USE_SELF_EXECUTION.setBoolean(false); CassandraRelevantProperties.TCM_USE_ATOMIC_LONG_PROCESSOR.setBoolean(true); - CassandraRelevantProperties.TCM_ALLOW_TRANSFORMATIONS_DURING_UPGRADES.setBoolean(true); // for paxosRepairVersionGate DatabaseDescriptor.daemonInitialization(); } diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/CMSTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/log/CMSTestBase.java index c89b52c047..bfec054085 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/CMSTestBase.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/CMSTestBase.java @@ -29,6 +29,7 @@ import org.apache.cassandra.locator.MetaStrategy; import org.apache.cassandra.schema.DistributedSchema; import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.Keyspaces; +import org.apache.cassandra.schema.SchemaTestUtil; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.tcm.Commit; @@ -97,9 +98,7 @@ public class CMSTestBase } }); - service.commit(new AlterSchema((cm) -> { - return cm.schema.getKeyspaces().with(Keyspaces.of(KeyspaceMetadata.create("test", rf.asKeyspaceParams()))); - })); + service.commit(new AlterSchema(SchemaTestUtil.toTransformation(metadata -> metadata.schema.getKeyspaces().with(Keyspaces.of(KeyspaceMetadata.create("test", rf.asKeyspaceParams())))))); } public void close() diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java b/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java index 0acd6895b8..43c147c255 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java @@ -54,8 +54,8 @@ import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.schema.Keyspaces; import org.apache.cassandra.schema.ReplicationParams; import org.apache.cassandra.schema.MemtableParams; -import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.SchemaConstants; +import org.apache.cassandra.schema.SchemaTestUtil; import org.apache.cassandra.schema.SchemaTransformation; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.accord.AccordStaleReplicas; @@ -100,6 +100,7 @@ import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Throwables; +import static org.apache.cassandra.schema.SchemaTestUtil.submit; import static org.junit.Assert.assertEquals; public class ClusterMetadataTestHelper @@ -235,7 +236,7 @@ public class ClusterMetadataTestHelper } else { - Schema.instance.submit(cms -> { + submit(cms -> { var km = cms.schema.getKeyspaceMetadata(ks); var update = km.withSwapped(km.tables.withSwapped(km.tables.getNullable(table).unbuild() .memtable(memtable) @@ -309,10 +310,15 @@ public class ClusterMetadataTestHelper } public static NodeId register(InetAddressAndPort endpoint, String dc, String rack) + { + return register(endpoint, dc, rack, NodeVersion.CURRENT); + } + + public static NodeId register(InetAddressAndPort endpoint, String dc, String rack, NodeVersion version) { try { - NodeId id = commit(new Register(addr(endpoint), new Location(dc, rack), NodeVersion.CURRENT)).directory.peerId(endpoint); + NodeId id = commit(new Register(addr(endpoint), new Location(dc, rack), version)).directory.peerId(endpoint); if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort())) RegistrationStatus.instance.onRegistration(); return id; @@ -891,11 +897,12 @@ public class ClusterMetadataTestHelper metadata = ClusterMetadataService.instance().commit(next); } } + public static void addOrUpdateKeyspace(KeyspaceMetadata keyspace) { try { - SchemaTransformation transformation = (cm) -> cm.schema.getKeyspaces().withAddedOrUpdated(keyspace); + SchemaTransformation transformation = SchemaTestUtil.toTransformation(metadata -> metadata.schema.getKeyspaces().withAddedOrUpdated(keyspace)); commit(new AlterSchema(transformation)); } catch (Exception e) diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/RegisterTest.java b/test/distributed/org/apache/cassandra/distributed/test/log/RegisterTest.java index 786dc79388..a50c105998 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/RegisterTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/RegisterTest.java @@ -31,7 +31,6 @@ import org.apache.cassandra.distributed.api.IInstanceConfig; import org.apache.cassandra.distributed.api.IInvokableInstance; import org.apache.cassandra.distributed.api.TokenSupplier; import org.apache.cassandra.distributed.shared.NetworkTopology; -import org.apache.cassandra.distributed.shared.WithProperties; import org.apache.cassandra.distributed.test.TestBaseImpl; import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.locator.InetAddressAndPort; @@ -55,7 +54,6 @@ import org.apache.cassandra.tcm.transformations.Startup; import org.apache.cassandra.tcm.transformations.Unregister; import org.apache.cassandra.utils.CassandraVersion; -import static org.apache.cassandra.config.CassandraRelevantProperties.TCM_ALLOW_TRANSFORMATIONS_DURING_UPGRADES; import static org.junit.Assert.assertEquals; public class RegisterTest extends TestBaseImpl @@ -103,8 +101,7 @@ public class RegisterTest extends TestBaseImpl public void serializationVersionCeilingTest() throws Throwable { try (Cluster cluster = builder().withNodes(1) - .createWithoutStarting(); - WithProperties prop = new WithProperties().set(TCM_ALLOW_TRANSFORMATIONS_DURING_UPGRADES, "true")) + .createWithoutStarting()) { final String firstNodeEndpoint = "127.0.0.10"; cluster.get(1).startup(); diff --git a/test/distributed/org/apache/cassandra/distributed/test/ring/DecommissionTest.java b/test/distributed/org/apache/cassandra/distributed/test/ring/DecommissionTest.java index db2f2eca6d..156a957f94 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/ring/DecommissionTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/ring/DecommissionTest.java @@ -41,7 +41,6 @@ import org.apache.cassandra.distributed.Constants; import org.apache.cassandra.distributed.api.ConsistencyLevel; import org.apache.cassandra.distributed.api.IInstanceConfig; import org.apache.cassandra.distributed.api.IInvokableInstance; -import org.apache.cassandra.distributed.api.NodeToolResult; import org.apache.cassandra.distributed.api.TokenSupplier; import org.apache.cassandra.distributed.shared.ClusterUtils; import org.apache.cassandra.distributed.test.TestBaseImpl; @@ -53,7 +52,6 @@ import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.tcm.membership.NodeVersion; -import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.tcm.transformations.Startup; import org.apache.cassandra.utils.CassandraVersion; import org.apache.cassandra.utils.FBUtilities; @@ -236,49 +234,6 @@ public class DecommissionTest extends TestBaseImpl } } - @Test - public void testMixedVersionBlockDecom() throws IOException { - try (Cluster cluster = builder().withNodes(3) - .withConfig(config -> config.with(GOSSIP, NETWORK)) - .start()) - { - cluster.get(3).nodetoolResult("decommission", "--force").asserts().success(); - - // make node2 run V0: - cluster.get(2).runOnInstance(() -> { - ClusterMetadata metadata = ClusterMetadata.current(); - - ClusterMetadataService.instance().commit(new Startup(metadata.myNodeId(), - metadata.directory.getNodeAddresses(metadata.myNodeId()), - new NodeVersion(new CassandraVersion("4.0.0"), - Version.V0))); - }); - - // make node1 run V1: - cluster.get(1).runOnInstance(() -> { - ClusterMetadata metadata = ClusterMetadata.current(); - - ClusterMetadataService.instance().commit(new Startup(metadata.myNodeId(), - metadata.directory.getNodeAddresses(metadata.myNodeId()), - new NodeVersion(new CassandraVersion("6.0.0"), - NodeVersion.CURRENT_METADATA_VERSION))); - }); - ClusterUtils.waitForCMSToQuiesce(cluster, cluster.get(1), 3); - NodeToolResult res = cluster.get(2).nodetoolResult("decommission", "--force"); - res.asserts().failure(); - assertTrue(res.getStdout().contains("Upgrade in progress")); - cluster.get(2).runOnInstance(() -> { - ClusterMetadata metadata = ClusterMetadata.current(); - - ClusterMetadataService.instance().commit(new Startup(metadata.myNodeId(), - metadata.directory.getNodeAddresses(metadata.myNodeId()), - new NodeVersion(new CassandraVersion("6.0.0"), - NodeVersion.CURRENT_METADATA_VERSION))); - }); - cluster.get(2).nodetoolResult("decommission", "--force").asserts().success(); - } - } - @Test public void testPeersPostDecom() throws IOException { diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java index 0350fca36b..750141674a 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java @@ -30,7 +30,6 @@ import org.junit.runners.Parameterized; import org.apache.cassandra.ServerTestUtils; import org.apache.cassandra.Util; -import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.cql3.CQLTester; import org.apache.cassandra.cql3.Duration; import org.apache.cassandra.gms.Gossiper; @@ -63,7 +62,6 @@ public class InsertUpdateIfConditionTest extends CQLTester @BeforeClass public static void beforeClass() { - CassandraRelevantProperties.TCM_ALLOW_TRANSFORMATIONS_DURING_UPGRADES.setBoolean(true); Gossiper.instance.start(0); } diff --git a/test/unit/org/apache/cassandra/schema/SchemaChangeDuringRangeMovementTest.java b/test/unit/org/apache/cassandra/schema/SchemaChangeDuringRangeMovementTest.java index d54b2e5e31..6fc4c640b5 100644 --- a/test/unit/org/apache/cassandra/schema/SchemaChangeDuringRangeMovementTest.java +++ b/test/unit/org/apache/cassandra/schema/SchemaChangeDuringRangeMovementTest.java @@ -147,14 +147,15 @@ public class SchemaChangeDuringRangeMovementTest extends CQLTester execute(String.format("CREATE KEYSPACE %s " + "WITH REPLICATION = {'class':'SimpleStrategy','replication_factor':9}", RF9_KS4)); - SchemaTransformation dropAllowed = (metadata_) -> metadata_.schema.getKeyspaces().without(RF9_KS4).without(RF9_KS3); + SchemaTransformation dropAllowed = SchemaTestUtil.toTransformation(metadata2 -> metadata2.schema.getKeyspaces().without(RF9_KS4).without(RF9_KS3)); + metadata = ClusterMetadataService.instance().commit(new AlterSchema(dropAllowed)); assertFalse(metadata.schema.getKeyspaces().containsKeyspace(RF9_KS4)); assertFalse(metadata.schema.getKeyspaces().containsKeyspace(RF9_KS3)); try { - SchemaTransformation dropRejected = (metadata_) -> metadata_.schema.getKeyspaces().without(RF9_KS2).without(RF9_KS1); + SchemaTransformation dropRejected = SchemaTestUtil.toTransformation(metadata1 -> metadata1.schema.getKeyspaces().without(RF9_KS2).without(RF9_KS1)); ClusterMetadataService.instance().commit(new AlterSchema(dropRejected)); fail("Expected exception"); } diff --git a/test/unit/org/apache/cassandra/schema/SchemaTest.java b/test/unit/org/apache/cassandra/schema/SchemaTest.java index fcf3c6997b..b290a843c8 100644 --- a/test/unit/org/apache/cassandra/schema/SchemaTest.java +++ b/test/unit/org/apache/cassandra/schema/SchemaTest.java @@ -18,6 +18,7 @@ */ package org.apache.cassandra.schema; +import java.util.function.Function; import java.util.function.Predicate; import org.junit.Before; @@ -28,12 +29,14 @@ import org.apache.cassandra.ServerTestUtils; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.statements.schema.AlterSchemaStatement; +import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.Epoch; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class SchemaTest { @@ -61,7 +64,7 @@ public class SchemaTest Tables tables = Tables.of(TableMetadata.minimal(KS_ONE, "modified1"), TableMetadata.minimal(KS_ONE, "modified2")); KeyspaceMetadata ksm = KeyspaceMetadata.create(KS_ONE, KeyspaceParams.simple(1), tables); - applyAndAssertTableMetadata((metadata) -> metadata.schema.getKeyspaces().withAddedOrUpdated(ksm), true); + applyAndAssertTableMetadata(metadata -> metadata.schema.getKeyspaces().withAddedOrUpdated(ksm), true); } @Test @@ -69,12 +72,12 @@ public class SchemaTest { // Create an empty keyspace KeyspaceMetadata ksm = KeyspaceMetadata.create(KS_ONE, KeyspaceParams.simple(1)); - Schema.instance.submit((metadata) -> metadata.schema.getKeyspaces().withAddedOrUpdated(ksm)); + SchemaTestUtil.submit(metadata -> metadata.schema.getKeyspaces().withAddedOrUpdated(ksm)); // Add two tables and verify that the resultant table metadata has the correct epoch Tables tables = Tables.of(TableMetadata.minimal(KS_ONE, "modified1"), TableMetadata.minimal(KS_ONE, "modified2")); KeyspaceMetadata updated = ksm.withSwapped(tables); - applyAndAssertTableMetadata((metadata) -> metadata.schema.getKeyspaces().withAddedOrUpdated(updated), true); + applyAndAssertTableMetadata(metadata -> metadata.schema.getKeyspaces().withAddedOrUpdated(updated), true); } @Test @@ -82,12 +85,12 @@ public class SchemaTest { Tables tables = Tables.of(TableMetadata.minimal(KS_ONE, "unmodified")); KeyspaceMetadata ksm = KeyspaceMetadata.create(KS_ONE, KeyspaceParams.simple(1), tables); - Schema.instance.submit((metadata) -> metadata.schema.getKeyspaces().withAddedOrUpdated(ksm)); + SchemaTestUtil.submit(metadata -> metadata.schema.getKeyspaces().withAddedOrUpdated(ksm)); // Add a second table and assert that its table metadata has the latest epoch, but that the // metadata of the other table stays unmodified KeyspaceMetadata updated = ksm.withSwapped(tables.with(TableMetadata.minimal(KS_ONE, "modified1"))); - applyAndAssertTableMetadata((metadata) -> metadata.schema.getKeyspaces().withAddedOrUpdated(updated)); + applyAndAssertTableMetadata(metadata -> metadata.schema.getKeyspaces().withAddedOrUpdated(updated), false); } @Test @@ -95,7 +98,7 @@ public class SchemaTest { Tables tables = Tables.of(TableMetadata.minimal(KS_ONE, "unmodified")); KeyspaceMetadata ksm = KeyspaceMetadata.create(KS_ONE, KeyspaceParams.simple(1), tables); - Schema.instance.submit((metadata) -> metadata.schema.getKeyspaces().withAddedOrUpdated(ksm)); + SchemaTestUtil.submit(metadata -> metadata.schema.getKeyspaces().withAddedOrUpdated(ksm)); applyAndAssertTableMetadata(cql(KS_ONE, "CREATE TABLE %s.modified (k int PRIMARY KEY)")); } @@ -105,16 +108,16 @@ public class SchemaTest { KeyspaceMetadata ksm1 = KeyspaceMetadata.create(KS_ONE, KeyspaceParams.simple(1)); KeyspaceMetadata ksm2 = KeyspaceMetadata.create(KS_TWO, KeyspaceParams.simple(1)); - Schema.instance.submit((metadata) -> metadata.schema.getKeyspaces().withAddedOrUpdated(ksm1).withAddedOrUpdated(ksm2)); + SchemaTestUtil.submit(metadata -> metadata.schema.getKeyspaces().withAddedOrUpdated(ksm1).withAddedOrUpdated(ksm2)); // Add two tables in each ks and verify that the resultant table metadata has the correct epoch Tables tables1 = Tables.of(TableMetadata.minimal(KS_ONE, "modified1"), TableMetadata.minimal(KS_ONE, "modified2")); KeyspaceMetadata updated1 = ksm1.withSwapped(tables1); Tables tables2 = Tables.of(TableMetadata.minimal(KS_TWO, "modified1"), TableMetadata.minimal(KS_TWO, "modified2")); KeyspaceMetadata updated2 = ksm2.withSwapped(tables2); - applyAndAssertTableMetadata((metadata) -> metadata.schema.getKeyspaces() - .withAddedOrUpdated(updated1) - .withAddedOrUpdated(updated2), + applyAndAssertTableMetadata(metadata -> metadata.schema.getKeyspaces() + .withAddedOrUpdated(updated1) + .withAddedOrUpdated(updated2), true); } @@ -124,14 +127,14 @@ public class SchemaTest { KeyspaceMetadata ksm1 = KeyspaceMetadata.create(KS_ONE, KeyspaceParams.simple(1)); KeyspaceMetadata ksm2 = KeyspaceMetadata.create(KS_TWO, KeyspaceParams.simple(1)); - Schema.instance.submit((metadata) -> metadata.schema.getKeyspaces().withAddedOrUpdated(ksm1).withAddedOrUpdated(ksm2)); + SchemaTestUtil.submit(metadata -> metadata.schema.getKeyspaces().withAddedOrUpdated(ksm1).withAddedOrUpdated(ksm2)); // Add two tables in each ks and verify that the resultant table metadata has the correct epoch Tables tables1 = Tables.of(TableMetadata.minimal(KS_ONE, "unmodified1"), TableMetadata.minimal(KS_ONE, "unmodified2")); KeyspaceMetadata updated1 = ksm1.withSwapped(tables1); Tables tables2 = Tables.of(TableMetadata.minimal(KS_TWO, "unmodified1"), TableMetadata.minimal(KS_TWO, "unmodified2")); KeyspaceMetadata updated2 = ksm2.withSwapped(tables2); - Schema.instance.submit((metadata) -> metadata.schema.getKeyspaces().withAddedOrUpdated(updated1).withAddedOrUpdated(updated2)); + SchemaTestUtil.submit(metadata -> metadata.schema.getKeyspaces().withAddedOrUpdated(updated1).withAddedOrUpdated(updated2)); // Add a third table in one ks and assert that its table metadata has the latest epoch, but that the // metadata of the all other tables stays unmodified @@ -180,15 +183,42 @@ public class SchemaTest applyAndAssertTableMetadata(cql(KS_ONE, "ALTER TABLE %s.modified WITH comment = 'altered'")); } + @Test + public void schemaTransformationRejectionTest() + { + try + { + Schema.instance.submit(new SchemaTransformation() + { + @Override + public Keyspaces apply(ClusterMetadata metadata) + { + return null; + } + + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return false; + } + }); + fail("Unsupported transformation should have failed"); + } + catch (InvalidRequestException e) + { + assertTrue(e.getMessage().contains("Transformation rejected")); + } + } + private void applyAndAssertTableMetadata(SchemaTransformation transformation) { - applyAndAssertTableMetadata(transformation, false); + applyAndAssertTableMetadata(transformation::apply, false); } - private void applyAndAssertTableMetadata(SchemaTransformation transformation, boolean onlyModified) + private void applyAndAssertTableMetadata(Function<ClusterMetadata, Keyspaces> transformation, boolean onlyModified) { Epoch before = ClusterMetadata.current().epoch; - Schema.instance.submit(transformation); + SchemaTestUtil.submit(transformation); Epoch after = ClusterMetadata.current().epoch; assertTrue(after.isDirectlyAfter(before)); DistributedSchema schema = ClusterMetadata.current().schema; @@ -207,11 +237,11 @@ public class SchemaTest }); } - private static AlterSchemaStatement cql(String keyspace, String cql) + private static SchemaTransformation cql(String keyspace, String cql) { AlterSchemaStatement statement = (AlterSchemaStatement) QueryProcessor.parseStatement(String.format(cql, keyspace)) .prepare(ClientState.forInternalCalls()); statement.setExecutionTimestamp(System.currentTimeMillis() * 1000); - return statement; + return SchemaTestUtil.toTransformation(statement::apply); } } diff --git a/test/unit/org/apache/cassandra/schema/SchemaTestUtil.java b/test/unit/org/apache/cassandra/schema/SchemaTestUtil.java index 9fb2aac882..f75cecd159 100644 --- a/test/unit/org/apache/cassandra/schema/SchemaTestUtil.java +++ b/test/unit/org/apache/cassandra/schema/SchemaTestUtil.java @@ -19,6 +19,7 @@ package org.apache.cassandra.schema; import java.util.Collections; +import java.util.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,18 +40,7 @@ public class SchemaTestUtil throw new AlreadyExistsException(ksm.name); logger.info("Create new Keyspace: {}", ksm); - Schema.instance.submit(new SchemaTransformation() - { - public Keyspaces apply(ClusterMetadata metadata) - { - return metadata.schema.getKeyspaces().withAddedOrUpdated(ksm); - } - - public String cql() - { - return "fake"; - } - }); + submit((metadata) -> metadata.schema.getKeyspaces().withAddedOrUpdated(ksm)); } public static void announceNewTable(TableMetadata cfm) @@ -70,7 +60,7 @@ public class SchemaTestUtil throw new AlreadyExistsException(cfm.keyspace, cfm.name); logger.info("Create new table: {}", cfm); - Schema.instance.submit((metadata) -> metadata.schema.getKeyspaces().withAddedOrUpdated(ksm.withSwapped(ksm.tables.with(cfm)))); + submit((metadata) -> metadata.schema.getKeyspaces().withAddedOrUpdated(ksm.withSwapped(ksm.tables.with(cfm)))); } static void announceKeyspaceUpdate(KeyspaceMetadata ksm) @@ -82,7 +72,7 @@ public class SchemaTestUtil throw new ConfigurationException(String.format("Cannot update non existing keyspace '%s'.", ksm.name)); logger.info("Update Keyspace '{}' From {} To {}", ksm.name, oldKsm, ksm); - Schema.instance.submit((metadata) -> metadata.schema.getKeyspaces().withAddedOrUpdated(ksm)); + submit((metadata) -> metadata.schema.getKeyspaces().withAddedOrUpdated(ksm)); } public static void announceTableUpdate(TableMetadata updated) @@ -97,7 +87,7 @@ public class SchemaTestUtil updated.validateCompatibility(current); logger.info("Update table '{}/{}' From {} To {}", current.keyspace, current.name, current, updated); - Schema.instance.submit((metadata) -> metadata.schema.getKeyspaces().withAddedOrUpdated(ksm.withSwapped(ksm.tables.withSwapped(updated)))); + submit((metadata) -> metadata.schema.getKeyspaces().withAddedOrUpdated(ksm.withSwapped(ksm.tables.withSwapped(updated)))); } static void announceKeyspaceDrop(String ksName) @@ -107,12 +97,13 @@ public class SchemaTestUtil throw new ConfigurationException(String.format("Cannot drop non existing keyspace '%s'.", ksName)); logger.info("Drop Keyspace '{}'", oldKsm.name); - Schema.instance.submit((metadata) -> metadata.schema.getKeyspaces().without(ksName)); + submit((metadata) -> metadata.schema.getKeyspaces().without(ksName)); } - public static SchemaTransformation dropTable(String ksName, String cfName) + public static void announceTableDrop(String ksName, String cfName) { - return (metadata) -> { + logger.info("Drop table '{}/{}'", ksName, cfName); + submit((metadata) -> { Keyspaces schema = metadata.schema.getKeyspaces(); KeyspaceMetadata ksm = schema.getNullable(ksName); TableMetadata tm = ksm != null ? ksm.getTableOrViewNullable(cfName) : null; @@ -120,28 +111,63 @@ public class SchemaTestUtil throw new ConfigurationException(String.format("Cannot drop non existing table '%s' in keyspace '%s'.", cfName, ksName)); return schema.withAddedOrUpdated(ksm.withSwapped(ksm.tables.without(cfName))); - }; - } - - public static void announceTableDrop(String ksName, String cfName) - { - logger.info("Drop table '{}/{}'", ksName, cfName); - Schema.instance.submit(dropTable(ksName, cfName)); + }); } public static void addOrUpdateKeyspace(KeyspaceMetadata ksm) { - Schema.instance.submit((metadata) -> metadata.schema.getKeyspaces().withAddedOrUpdated(ksm)); + submit((metadata) -> metadata.schema.getKeyspaces().withAddedOrUpdated(ksm)); } @Deprecated(since = "CEP-21") // TODO remove this public static void addOrUpdateKeyspace(KeyspaceMetadata ksm, boolean locally) { - Schema.instance.submit((metadata) -> metadata.schema.getKeyspaces().withAddedOrUpdated(ksm)); + submit((metadata) -> metadata.schema.getKeyspaces().withAddedOrUpdated(ksm)); } public static void dropKeyspaceIfExist(String ksName, boolean locally) { - Schema.instance.submit((metadata) -> metadata.schema.getKeyspaces().without(Collections.singletonList(ksName))); + submit((metadata) -> metadata.schema.getKeyspaces().without(Collections.singletonList(ksName))); + } + + public static void submit(Function<ClusterMetadata, Keyspaces> transformation) + { + Schema.instance.submit(toTransformation(transformation)); + } + + /** + * Returns a {@link SchemaTransformation} with an {@code compatibleWith} implementation hardcoded to return true. + * + * This is intended for use in unit tests where there is only a single Cassandra version to consider, i.e. the + * transformation can be assumed compatible with whatever the current {@link ClusterMetadata} state is. It isn't + * suitable for dtests/upgrade tests etc. It is also worth noting that the {@code cql} method of the returned + * {@code SchemaTransformation} does not produce valid CQL, which procludes these transformations from being + * serialized/deserialized. + * + * @param transformation function to apply in order to modify schema + * @return SchemaTransformation instance that wraps the supplied function + */ + public static SchemaTransformation toTransformation(Function<ClusterMetadata, Keyspaces> transformation) + { + return new SchemaTransformation() + { + @Override + public Keyspaces apply(ClusterMetadata metadata) + { + return transformation.apply(metadata); + } + + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return true; + } + + @Override + public String cql() + { + return "fake"; + } + }; } } diff --git a/test/unit/org/apache/cassandra/tcm/ClusterMetadataTest.java b/test/unit/org/apache/cassandra/tcm/ClusterMetadataTest.java index e6d7243a46..50e085fc6d 100644 --- a/test/unit/org/apache/cassandra/tcm/ClusterMetadataTest.java +++ b/test/unit/org/apache/cassandra/tcm/ClusterMetadataTest.java @@ -20,6 +20,7 @@ package org.apache.cassandra.tcm; import java.util.concurrent.ExecutionException; +import com.google.common.collect.ImmutableSet; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -28,14 +29,27 @@ import org.apache.cassandra.ServerTestUtils; import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper; import org.apache.cassandra.distributed.test.log.CMSTestBase; import org.apache.cassandra.harry.model.TokenPlacementModel; +import org.apache.cassandra.schema.Keyspaces; +import org.apache.cassandra.schema.SchemaTransformation; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.tcm.membership.NodeVersion; +import org.apache.cassandra.tcm.ownership.UniformRangePlacement; import org.apache.cassandra.tcm.sequences.BootstrapAndJoin; +import org.apache.cassandra.tcm.sequences.LockedRanges; import org.apache.cassandra.tcm.sequences.UnbootstrapAndLeave; import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.tcm.ownership.DataPlacement; +import org.apache.cassandra.tcm.serialization.Version; +import org.apache.cassandra.tcm.transformations.AlterSchema; +import org.apache.cassandra.tcm.transformations.Assassinate; +import org.apache.cassandra.tcm.transformations.CustomTransformation; +import org.apache.cassandra.utils.CassandraVersion; +import static org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper.addr; import static org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper.getLeavePlan; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class ClusterMetadataTest { @@ -106,4 +120,76 @@ public class ClusterMetadataTest { // todo } + + @Test + public void testNewTransformationCommit() + { + newTransformationHelper(new CustomTransformation("TEST", new NewTransformation())); + } + + @Test + public void testNewSchemaTransformation() + { + newTransformationHelper(new AlterSchema(new V5SchemaTransformation())); + } + + private static void newTransformationHelper(Transformation transformation) + { + NodeId v4Node = null; + for (int i = 1; i <= 4; i++) + { + NodeId nodeId = ClusterMetadataTestHelper.register(addr(i), "dc0", "rack0", new NodeVersion(CassandraVersion.CASSANDRA_5_0, i == 4 ? Version.V4 : Version.V5)); + if (i == 4) + v4Node = nodeId; + ClusterMetadataTestHelper.join(i, i); + } + + try + { + ClusterMetadataService.instance().commit(transformation); + fail("Should not be able to commit V5 transformation in V4 cluster"); + } + catch (IllegalStateException e) + { + assertTrue(e.getMessage().contains("Transformation rejected")); + } + ClusterMetadataService.instance().commit(new Assassinate(v4Node, new UniformRangePlacement())); + ClusterMetadataService.instance().commit(transformation); + } + + public static class V5SchemaTransformation implements SchemaTransformation + { + @Override + public Keyspaces apply(ClusterMetadata metadata) + { + return metadata.schema.getKeyspaces(); + } + + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V5); + } + } + + public static class NewTransformation implements Transformation + { + @Override + public Kind kind() + { + return Kind.CUSTOM; + } + + @Override + public Result execute(ClusterMetadata metadata) + { + return new Success(metadata, LockedRanges.AffectedRanges.EMPTY, ImmutableSet.of()); + } + + @Override + public boolean eligibleToCommit(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V5); + } + } } diff --git a/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java b/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java index c7b9cbef4b..55470d5991 100644 --- a/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java +++ b/tools/stress/src/org/apache/cassandra/io/sstable/StressCQLSSTableWriter.java @@ -57,13 +57,17 @@ import org.apache.cassandra.exceptions.SyntaxException; import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.Keyspaces; import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.SchemaTransformation; import org.apache.cassandra.schema.SchemaTransformations; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.schema.Tables; import org.apache.cassandra.schema.Types; import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.JavaDriverUtils; @@ -621,7 +625,20 @@ public class StressCQLSSTableWriter implements Closeable String keyspace = schemaStatement.keyspace(); KeyspaceMetadata ksm = KeyspaceMetadata.create(keyspace, KeyspaceParams.simple(1)); - Schema.instance.submit((metadata) -> metadata.schema.getKeyspaces().withAddedOrUpdated(ksm)); + Schema.instance.submit(new SchemaTransformation() + { + @Override + public Keyspaces apply(ClusterMetadata metadata) + { + return metadata.schema.getKeyspaces().withAddedOrUpdated(ksm); + } + + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V0); + } + }); Types types = createTypes(keyspace, typeStatements); Schema.instance.submit(SchemaTransformations.addTypes(types, true)); @@ -640,7 +657,20 @@ public class StressCQLSSTableWriter implements Closeable .build(); Tables tables = Tables.of(tableMetadata); KeyspaceMetadata updated = ksm.withSwapped(tables); - Schema.instance.submit((metadata) -> metadata.schema.getKeyspaces().withAddedOrUpdated(updated)); + Schema.instance.submit(new SchemaTransformation() + { + @Override + public Keyspaces apply(ClusterMetadata metadata) + { + return metadata.schema.getKeyspaces().withAddedOrUpdated(updated); + } + + @Override + public boolean compatibleWith(ClusterMetadata metadata) + { + return metadata.directory.commonSerializationVersion.isAtLeast(Version.V0); + } + }); Keyspace.setInitialized(); Directories directories = new Directories(tableMetadata, directoryList.stream().map(f -> new Directories.DataDirectory(new org.apache.cassandra.io.util.File(f.toPath()))).collect(Collectors.toList())); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
