twalthr commented on code in PR #27327:
URL: https://github.com/apache/flink/pull/27327#discussion_r2611404652
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableSchemaConverter.java:
##########
@@ -66,5 +83,90 @@ protected SchemaConverter createSchemaConverter(
ResolvedCatalogMaterializedTable oldTable, ConvertContext
context) {
return new SchemaAddConverter(oldTable, context);
}
+
+ @Override
+ protected void validateChanges(
+ ResolvedSchema oldSchema, Schema newSchema, ConvertContext
context) {}
+ }
+
+ /** A converter for {@link SqlAlterMaterializedTableModifySchema}. */
+ public static class SqlAlterMaterializedTableModifySchemaConverter
+ extends SqlAlterMaterializedTableSchemaConverter<
+ SqlAlterMaterializedTableModifySchema> {
+ @Override
+ protected SchemaConverter createSchemaConverter(
+ ResolvedCatalogMaterializedTable oldMaterializedTable,
ConvertContext context) {
+ return new SchemaModifyConverter(oldMaterializedTable, context);
+ }
+
+ @Override
+ protected void validateChanges(
+ ResolvedSchema oldSchema, Schema newSchema, ConvertContext
context) {
+ Map<String, Column> map = new HashMap<>();
+ for (int i = 0; i < oldSchema.getColumnCount(); i++) {
+ final Column column = oldSchema.getColumn(i).get();
+ map.put(column.getName(), column);
+ }
+
+ List<Schema.UnresolvedColumn> columns = newSchema.getColumns();
Review Comment:
import all column types
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableSchemaConverter.java:
##########
@@ -66,5 +83,90 @@ protected SchemaConverter createSchemaConverter(
ResolvedCatalogMaterializedTable oldTable, ConvertContext
context) {
return new SchemaAddConverter(oldTable, context);
}
+
+ @Override
+ protected void validateChanges(
+ ResolvedSchema oldSchema, Schema newSchema, ConvertContext
context) {}
+ }
+
+ /** A converter for {@link SqlAlterMaterializedTableModifySchema}. */
+ public static class SqlAlterMaterializedTableModifySchemaConverter
+ extends SqlAlterMaterializedTableSchemaConverter<
+ SqlAlterMaterializedTableModifySchema> {
+ @Override
+ protected SchemaConverter createSchemaConverter(
+ ResolvedCatalogMaterializedTable oldMaterializedTable,
ConvertContext context) {
+ return new SchemaModifyConverter(oldMaterializedTable, context);
+ }
+
+ @Override
+ protected void validateChanges(
+ ResolvedSchema oldSchema, Schema newSchema, ConvertContext
context) {
+ Map<String, Column> map = new HashMap<>();
+ for (int i = 0; i < oldSchema.getColumnCount(); i++) {
+ final Column column = oldSchema.getColumn(i).get();
+ map.put(column.getName(), column);
+ }
+
+ List<Schema.UnresolvedColumn> columns = newSchema.getColumns();
+ for (int i = 0; i < columns.size(); i++) {
+ Schema.UnresolvedColumn col = columns.get(i);
+ final String name = col.getName();
+ if (map.containsKey(name) && !columnTypeKept(col,
map.get(name))) {
+ throw new ValidationException(
+ String.format(
+ "Failed to execute ALTER MATERIALIZED
TABLE statement.\n"
+ + "Changing of %s column '%s' to
%s column is not supported.",
+ getColumnKind(map.get(name).getClass()),
+ name,
+ getColumnKind(col.getClass())));
+ }
+ if (col instanceof Schema.UnresolvedComputedColumn) {
+ continue;
+ }
+
+ LogicalType dataType =
+
createDataType(context.getCatalogManager().getDataTypeFactory(), col);
+ LogicalType oldDataType =
map.get(col.getName()).getDataType().getLogicalType();
+ if (!LogicalTypeCasts.supportsImplicitCast(oldDataType,
dataType)) {
Review Comment:
Is this new logic or was this previous behavior?
##########
flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java:
##########
@@ -471,7 +471,130 @@ void testAlterMaterializedTableAddWatermark() {
}
@Test
- void testAlterMaterializedTableAddDistribution() {
+ void testModifySingleColumn() {
+ sql("alter materialized table mt1 modify new_column string comment
'new_column docs'")
+ .ok(
+ "ALTER MATERIALIZED TABLE `MT1` MODIFY (\n"
+ + " `NEW_COLUMN` STRING COMMENT 'new_column
docs'\n"
+ + ")");
+ sql("alter materialized table mt1 modify new_column string comment
'new_column docs'")
+ .ok(
+ "ALTER MATERIALIZED TABLE `MT1` MODIFY (\n"
+ + " `NEW_COLUMN` STRING COMMENT 'new_column
docs'\n"
+ + ")");
+ sql("alter materialized table mt1 modify new_column string comment
'new_column docs' first")
+ .ok(
+ "ALTER MATERIALIZED TABLE `MT1` MODIFY (\n"
+ + " `NEW_COLUMN` STRING COMMENT 'new_column
docs' FIRST\n"
+ + ")");
+ sql("alter materialized table mt1 modify new_column string comment
'new_column docs' after id")
+ .ok(
+ "ALTER MATERIALIZED TABLE `MT1` MODIFY (\n"
+ + " `NEW_COLUMN` STRING COMMENT 'new_column
docs' AFTER `ID`\n"
+ + ")");
+ // modify column type
+ sql("alter materialized table mt1 modify new_column array<string not
null> not null")
+ .ok(
+ "ALTER MATERIALIZED TABLE `MT1` MODIFY (\n"
+ + " `NEW_COLUMN` ARRAY< STRING NOT NULL > NOT
NULL\n"
+ + ")");
+
+ // modify compute column
+ sql("alter materialized table mt1 modify col_int as col_a - col_b
after col_b")
+ .ok(
+ "ALTER MATERIALIZED TABLE `MT1` MODIFY (\n"
+ + " `COL_INT` AS (`COL_A` - `COL_B`) AFTER
`COL_B`\n"
+ + ")");
+ // modify metadata column
+ sql("alter materialized table mt1 modify col_int int metadata from
'mk1' virtual comment 'comment_metadata' after col_b")
+ .ok(
+ "ALTER MATERIALIZED TABLE `MT1` MODIFY (\n"
+ + " `COL_INT` INTEGER METADATA FROM 'mk1'
VIRTUAL COMMENT 'comment_metadata' AFTER `COL_B`\n"
+ + ")");
+
+ // modify nested column
+ sql("alter materialized table mt1 modify row_column.f0 int not null
comment 'change nullability'")
+ .ok(
+ "ALTER MATERIALIZED TABLE `MT1` MODIFY (\n"
+ + " `ROW_COLUMN`.`F0` INTEGER NOT NULL
COMMENT 'change nullability'\n"
+ + ")");
+
+ // modify nested column, shift position
+ sql("alter materialized table mt1 modify row_column.f0 int after
row_column.f2")
+ .ok(
+ "ALTER MATERIALIZED TABLE `MT1` MODIFY (\n"
+ + " `ROW_COLUMN`.`F0` INTEGER AFTER
`ROW_COLUMN`.`F2`\n"
+ + ")");
+ }
+
+ @Test
+ void testModifyWatermark() {
+ sql("alter materialized table mt1 modify watermark for ts as ts")
+ .ok(
+ "ALTER MATERIALIZED TABLE `MT1` MODIFY (\n"
+ + " WATERMARK FOR `TS` AS `TS`\n"
+ + ")");
+ sql("alter materialized table mt1 modify watermark for ts as ts -
interval '1' second")
+ .ok(
+ "ALTER MATERIALIZED TABLE `MT1` MODIFY (\n"
+ + " WATERMARK FOR `TS` AS (`TS` - INTERVAL
'1' SECOND)\n"
+ + ")");
+ sql("alter materialized table default_database.mt1 modify watermark
for ts as ts - interval '1' second")
+ .ok(
+ "ALTER MATERIALIZED TABLE `DEFAULT_DATABASE`.`MT1`
MODIFY (\n"
+ + " WATERMARK FOR `TS` AS (`TS` - INTERVAL
'1' SECOND)\n"
+ + ")");
+ sql("alter materialized table default_catalog.default_database.mt1
modify watermark for ts as ts - interval '1' second")
+ .ok(
+ "ALTER MATERIALIZED TABLE
`DEFAULT_CATALOG`.`DEFAULT_DATABASE`.`MT1` MODIFY (\n"
+ + " WATERMARK FOR `TS` AS (`TS` - INTERVAL
'1' SECOND)\n"
+ + ")");
+
+ sql("alter materialized table default_catalog.default_database.mt1
modify (\n"
+ + "watermark for ts as ts - interval '1' second,\n"
+ + "^watermark^ for f1 as now()\n"
+ + ")")
+ .fails("Multiple WATERMARK statements is not supported yet.");
Review Comment:
```suggestion
.fails("Multiple WATERMARK declarations are not supported
yet.");
```
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableSchemaConverter.java:
##########
@@ -66,5 +83,90 @@ protected SchemaConverter createSchemaConverter(
ResolvedCatalogMaterializedTable oldTable, ConvertContext
context) {
return new SchemaAddConverter(oldTable, context);
}
+
+ @Override
+ protected void validateChanges(
+ ResolvedSchema oldSchema, Schema newSchema, ConvertContext
context) {}
+ }
+
+ /** A converter for {@link SqlAlterMaterializedTableModifySchema}. */
+ public static class SqlAlterMaterializedTableModifySchemaConverter
+ extends SqlAlterMaterializedTableSchemaConverter<
+ SqlAlterMaterializedTableModifySchema> {
+ @Override
+ protected SchemaConverter createSchemaConverter(
+ ResolvedCatalogMaterializedTable oldMaterializedTable,
ConvertContext context) {
+ return new SchemaModifyConverter(oldMaterializedTable, context);
+ }
+
+ @Override
+ protected void validateChanges(
+ ResolvedSchema oldSchema, Schema newSchema, ConvertContext
context) {
+ Map<String, Column> map = new HashMap<>();
+ for (int i = 0; i < oldSchema.getColumnCount(); i++) {
+ final Column column = oldSchema.getColumn(i).get();
+ map.put(column.getName(), column);
+ }
+
+ List<Schema.UnresolvedColumn> columns = newSchema.getColumns();
+ for (int i = 0; i < columns.size(); i++) {
+ Schema.UnresolvedColumn col = columns.get(i);
+ final String name = col.getName();
+ if (map.containsKey(name) && !columnTypeKept(col,
map.get(name))) {
+ throw new ValidationException(
+ String.format(
+ "Failed to execute ALTER MATERIALIZED
TABLE statement.\n"
+ + "Changing of %s column '%s' to
%s column is not supported.",
+ getColumnKind(map.get(name).getClass()),
+ name,
+ getColumnKind(col.getClass())));
+ }
+ if (col instanceof Schema.UnresolvedComputedColumn) {
+ continue;
+ }
+
+ LogicalType dataType =
+
createDataType(context.getCatalogManager().getDataTypeFactory(), col);
+ LogicalType oldDataType =
map.get(col.getName()).getDataType().getLogicalType();
+ if (!LogicalTypeCasts.supportsImplicitCast(oldDataType,
dataType)) {
+ throw new ValidationException(
+ String.format(
+ "Failed to execute ALTER MATERIALIZED
TABLE statement.\n"
+ + "Column '%s' with type %s can
not be changed to type %s.",
+ col.getName(), oldDataType, dataType));
+ }
+ }
+ }
+
+ private LogicalType createDataType(
+ DataTypeFactory dataTypeFactory, Schema.UnresolvedColumn col) {
+ if (col instanceof Schema.UnresolvedMetadataColumn) {
+ return dataTypeFactory
+ .createDataType(((Schema.UnresolvedMetadataColumn)
col).getDataType())
+ .getLogicalType();
+ } else if (col instanceof Schema.UnresolvedPhysicalColumn) {
+ return dataTypeFactory
+ .createDataType(((Schema.UnresolvedPhysicalColumn)
col).getDataType())
+ .getLogicalType();
+ } else {
+ throw new ValidationException("Mot expected column type " +
col.getClass());
Review Comment:
```suggestion
throw new ValidationException("Unexpected column type " +
col.getClass());
```
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableSchemaConverter.java:
##########
@@ -66,5 +83,90 @@ protected SchemaConverter createSchemaConverter(
ResolvedCatalogMaterializedTable oldTable, ConvertContext
context) {
return new SchemaAddConverter(oldTable, context);
}
+
+ @Override
+ protected void validateChanges(
+ ResolvedSchema oldSchema, Schema newSchema, ConvertContext
context) {}
+ }
+
+ /** A converter for {@link SqlAlterMaterializedTableModifySchema}. */
+ public static class SqlAlterMaterializedTableModifySchemaConverter
+ extends SqlAlterMaterializedTableSchemaConverter<
+ SqlAlterMaterializedTableModifySchema> {
+ @Override
+ protected SchemaConverter createSchemaConverter(
+ ResolvedCatalogMaterializedTable oldMaterializedTable,
ConvertContext context) {
+ return new SchemaModifyConverter(oldMaterializedTable, context);
+ }
+
+ @Override
+ protected void validateChanges(
+ ResolvedSchema oldSchema, Schema newSchema, ConvertContext
context) {
+ Map<String, Column> map = new HashMap<>();
+ for (int i = 0; i < oldSchema.getColumnCount(); i++) {
+ final Column column = oldSchema.getColumn(i).get();
+ map.put(column.getName(), column);
+ }
+
+ List<Schema.UnresolvedColumn> columns = newSchema.getColumns();
Review Comment:
```suggestion
List<UnresolvedColumn> columns = newSchema.getColumns();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]