twalthr commented on code in PR #27327:
URL: https://github.com/apache/flink/pull/27327#discussion_r2611404652


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableSchemaConverter.java:
##########
@@ -66,5 +83,90 @@ protected SchemaConverter createSchemaConverter(
                 ResolvedCatalogMaterializedTable oldTable, ConvertContext 
context) {
             return new SchemaAddConverter(oldTable, context);
         }
+
+        @Override
+        protected void validateChanges(
+                ResolvedSchema oldSchema, Schema newSchema, ConvertContext 
context) {}
+    }
+
+    /** A converter for {@link SqlAlterMaterializedTableModifySchema}. */
+    public static class SqlAlterMaterializedTableModifySchemaConverter
+            extends SqlAlterMaterializedTableSchemaConverter<
+                    SqlAlterMaterializedTableModifySchema> {
+        @Override
+        protected SchemaConverter createSchemaConverter(
+                ResolvedCatalogMaterializedTable oldMaterializedTable, 
ConvertContext context) {
+            return new SchemaModifyConverter(oldMaterializedTable, context);
+        }
+
+        @Override
+        protected void validateChanges(
+                ResolvedSchema oldSchema, Schema newSchema, ConvertContext 
context) {
+            Map<String, Column> map = new HashMap<>();
+            for (int i = 0; i < oldSchema.getColumnCount(); i++) {
+                final Column column = oldSchema.getColumn(i).get();
+                map.put(column.getName(), column);
+            }
+
+            List<Schema.UnresolvedColumn> columns = newSchema.getColumns();

Review Comment:
   import all column types



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableSchemaConverter.java:
##########
@@ -66,5 +83,90 @@ protected SchemaConverter createSchemaConverter(
                 ResolvedCatalogMaterializedTable oldTable, ConvertContext 
context) {
             return new SchemaAddConverter(oldTable, context);
         }
+
+        @Override
+        protected void validateChanges(
+                ResolvedSchema oldSchema, Schema newSchema, ConvertContext 
context) {}
+    }
+
+    /** A converter for {@link SqlAlterMaterializedTableModifySchema}. */
+    public static class SqlAlterMaterializedTableModifySchemaConverter
+            extends SqlAlterMaterializedTableSchemaConverter<
+                    SqlAlterMaterializedTableModifySchema> {
+        @Override
+        protected SchemaConverter createSchemaConverter(
+                ResolvedCatalogMaterializedTable oldMaterializedTable, 
ConvertContext context) {
+            return new SchemaModifyConverter(oldMaterializedTable, context);
+        }
+
+        @Override
+        protected void validateChanges(
+                ResolvedSchema oldSchema, Schema newSchema, ConvertContext 
context) {
+            Map<String, Column> map = new HashMap<>();
+            for (int i = 0; i < oldSchema.getColumnCount(); i++) {
+                final Column column = oldSchema.getColumn(i).get();
+                map.put(column.getName(), column);
+            }
+
+            List<Schema.UnresolvedColumn> columns = newSchema.getColumns();
+            for (int i = 0; i < columns.size(); i++) {
+                Schema.UnresolvedColumn col = columns.get(i);
+                final String name = col.getName();
+                if (map.containsKey(name) && !columnTypeKept(col, 
map.get(name))) {
+                    throw new ValidationException(
+                            String.format(
+                                    "Failed to execute ALTER MATERIALIZED 
TABLE statement.\n"
+                                            + "Changing of %s column '%s' to 
%s column is not supported.",
+                                    getColumnKind(map.get(name).getClass()),
+                                    name,
+                                    getColumnKind(col.getClass())));
+                }
+                if (col instanceof Schema.UnresolvedComputedColumn) {
+                    continue;
+                }
+
+                LogicalType dataType =
+                        
createDataType(context.getCatalogManager().getDataTypeFactory(), col);
+                LogicalType oldDataType = 
map.get(col.getName()).getDataType().getLogicalType();
+                if (!LogicalTypeCasts.supportsImplicitCast(oldDataType, 
dataType)) {

Review Comment:
   Is this new logic or was this previous behavior?



##########
flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java:
##########
@@ -471,7 +471,130 @@ void testAlterMaterializedTableAddWatermark() {
     }
 
     @Test
-    void testAlterMaterializedTableAddDistribution() {
+    void testModifySingleColumn() {
+        sql("alter materialized table mt1 modify new_column string comment 
'new_column docs'")
+                .ok(
+                        "ALTER MATERIALIZED TABLE `MT1` MODIFY (\n"
+                                + "  `NEW_COLUMN` STRING COMMENT 'new_column 
docs'\n"
+                                + ")");
+        sql("alter materialized table mt1 modify new_column string comment 
'new_column docs'")
+                .ok(
+                        "ALTER MATERIALIZED TABLE `MT1` MODIFY (\n"
+                                + "  `NEW_COLUMN` STRING COMMENT 'new_column 
docs'\n"
+                                + ")");
+        sql("alter materialized table mt1 modify new_column string comment 
'new_column docs' first")
+                .ok(
+                        "ALTER MATERIALIZED TABLE `MT1` MODIFY (\n"
+                                + "  `NEW_COLUMN` STRING COMMENT 'new_column 
docs' FIRST\n"
+                                + ")");
+        sql("alter materialized table mt1 modify new_column string comment 
'new_column docs' after id")
+                .ok(
+                        "ALTER MATERIALIZED TABLE `MT1` MODIFY (\n"
+                                + "  `NEW_COLUMN` STRING COMMENT 'new_column 
docs' AFTER `ID`\n"
+                                + ")");
+        // modify column type
+        sql("alter materialized table mt1 modify new_column array<string not 
null> not null")
+                .ok(
+                        "ALTER MATERIALIZED TABLE `MT1` MODIFY (\n"
+                                + "  `NEW_COLUMN` ARRAY< STRING NOT NULL > NOT 
NULL\n"
+                                + ")");
+
+        // modify compute column
+        sql("alter materialized table mt1 modify col_int as col_a - col_b 
after col_b")
+                .ok(
+                        "ALTER MATERIALIZED TABLE `MT1` MODIFY (\n"
+                                + "  `COL_INT` AS (`COL_A` - `COL_B`) AFTER 
`COL_B`\n"
+                                + ")");
+        // modify metadata column
+        sql("alter materialized table mt1 modify col_int int metadata from 
'mk1' virtual comment 'comment_metadata' after col_b")
+                .ok(
+                        "ALTER MATERIALIZED TABLE `MT1` MODIFY (\n"
+                                + "  `COL_INT` INTEGER METADATA FROM 'mk1' 
VIRTUAL COMMENT 'comment_metadata' AFTER `COL_B`\n"
+                                + ")");
+
+        // modify nested column
+        sql("alter materialized table mt1 modify row_column.f0 int not null 
comment 'change nullability'")
+                .ok(
+                        "ALTER MATERIALIZED TABLE `MT1` MODIFY (\n"
+                                + "  `ROW_COLUMN`.`F0` INTEGER NOT NULL 
COMMENT 'change nullability'\n"
+                                + ")");
+
+        // modify nested column, shift position
+        sql("alter materialized table mt1 modify row_column.f0 int after 
row_column.f2")
+                .ok(
+                        "ALTER MATERIALIZED TABLE `MT1` MODIFY (\n"
+                                + "  `ROW_COLUMN`.`F0` INTEGER AFTER 
`ROW_COLUMN`.`F2`\n"
+                                + ")");
+    }
+
+    @Test
+    void testModifyWatermark() {
+        sql("alter materialized table mt1 modify watermark for ts as ts")
+                .ok(
+                        "ALTER MATERIALIZED TABLE `MT1` MODIFY (\n"
+                                + "  WATERMARK FOR `TS` AS `TS`\n"
+                                + ")");
+        sql("alter materialized table mt1 modify watermark for ts as ts - 
interval '1' second")
+                .ok(
+                        "ALTER MATERIALIZED TABLE `MT1` MODIFY (\n"
+                                + "  WATERMARK FOR `TS` AS (`TS` - INTERVAL 
'1' SECOND)\n"
+                                + ")");
+        sql("alter  materialized table default_database.mt1 modify watermark 
for ts as ts - interval '1' second")
+                .ok(
+                        "ALTER MATERIALIZED TABLE `DEFAULT_DATABASE`.`MT1` 
MODIFY (\n"
+                                + "  WATERMARK FOR `TS` AS (`TS` - INTERVAL 
'1' SECOND)\n"
+                                + ")");
+        sql("alter materialized table default_catalog.default_database.mt1 
modify watermark for ts as ts - interval '1' second")
+                .ok(
+                        "ALTER MATERIALIZED TABLE 
`DEFAULT_CATALOG`.`DEFAULT_DATABASE`.`MT1` MODIFY (\n"
+                                + "  WATERMARK FOR `TS` AS (`TS` - INTERVAL 
'1' SECOND)\n"
+                                + ")");
+
+        sql("alter materialized table default_catalog.default_database.mt1 
modify (\n"
+                        + "watermark for ts as ts - interval '1' second,\n"
+                        + "^watermark^ for f1 as now()\n"
+                        + ")")
+                .fails("Multiple WATERMARK statements is not supported yet.");

Review Comment:
   ```suggestion
                   .fails("Multiple WATERMARK declarations are not supported 
yet.");
   ```



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableSchemaConverter.java:
##########
@@ -66,5 +83,90 @@ protected SchemaConverter createSchemaConverter(
                 ResolvedCatalogMaterializedTable oldTable, ConvertContext 
context) {
             return new SchemaAddConverter(oldTable, context);
         }
+
+        @Override
+        protected void validateChanges(
+                ResolvedSchema oldSchema, Schema newSchema, ConvertContext 
context) {}
+    }
+
+    /** A converter for {@link SqlAlterMaterializedTableModifySchema}. */
+    public static class SqlAlterMaterializedTableModifySchemaConverter
+            extends SqlAlterMaterializedTableSchemaConverter<
+                    SqlAlterMaterializedTableModifySchema> {
+        @Override
+        protected SchemaConverter createSchemaConverter(
+                ResolvedCatalogMaterializedTable oldMaterializedTable, 
ConvertContext context) {
+            return new SchemaModifyConverter(oldMaterializedTable, context);
+        }
+
+        @Override
+        protected void validateChanges(
+                ResolvedSchema oldSchema, Schema newSchema, ConvertContext 
context) {
+            Map<String, Column> map = new HashMap<>();
+            for (int i = 0; i < oldSchema.getColumnCount(); i++) {
+                final Column column = oldSchema.getColumn(i).get();
+                map.put(column.getName(), column);
+            }
+
+            List<Schema.UnresolvedColumn> columns = newSchema.getColumns();
+            for (int i = 0; i < columns.size(); i++) {
+                Schema.UnresolvedColumn col = columns.get(i);
+                final String name = col.getName();
+                if (map.containsKey(name) && !columnTypeKept(col, 
map.get(name))) {
+                    throw new ValidationException(
+                            String.format(
+                                    "Failed to execute ALTER MATERIALIZED 
TABLE statement.\n"
+                                            + "Changing of %s column '%s' to 
%s column is not supported.",
+                                    getColumnKind(map.get(name).getClass()),
+                                    name,
+                                    getColumnKind(col.getClass())));
+                }
+                if (col instanceof Schema.UnresolvedComputedColumn) {
+                    continue;
+                }
+
+                LogicalType dataType =
+                        
createDataType(context.getCatalogManager().getDataTypeFactory(), col);
+                LogicalType oldDataType = 
map.get(col.getName()).getDataType().getLogicalType();
+                if (!LogicalTypeCasts.supportsImplicitCast(oldDataType, 
dataType)) {
+                    throw new ValidationException(
+                            String.format(
+                                    "Failed to execute ALTER MATERIALIZED 
TABLE statement.\n"
+                                            + "Column '%s' with type %s can 
not be changed to type %s.",
+                                    col.getName(), oldDataType, dataType));
+                }
+            }
+        }
+
+        private LogicalType createDataType(
+                DataTypeFactory dataTypeFactory, Schema.UnresolvedColumn col) {
+            if (col instanceof Schema.UnresolvedMetadataColumn) {
+                return dataTypeFactory
+                        .createDataType(((Schema.UnresolvedMetadataColumn) 
col).getDataType())
+                        .getLogicalType();
+            } else if (col instanceof Schema.UnresolvedPhysicalColumn) {
+                return dataTypeFactory
+                        .createDataType(((Schema.UnresolvedPhysicalColumn) 
col).getDataType())
+                        .getLogicalType();
+            } else {
+                throw new ValidationException("Mot expected column type " + 
col.getClass());

Review Comment:
   ```suggestion
                   throw new ValidationException("Unexpected column type " + 
col.getClass());
   ```



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableSchemaConverter.java:
##########
@@ -66,5 +83,90 @@ protected SchemaConverter createSchemaConverter(
                 ResolvedCatalogMaterializedTable oldTable, ConvertContext 
context) {
             return new SchemaAddConverter(oldTable, context);
         }
+
+        @Override
+        protected void validateChanges(
+                ResolvedSchema oldSchema, Schema newSchema, ConvertContext 
context) {}
+    }
+
+    /** A converter for {@link SqlAlterMaterializedTableModifySchema}. */
+    public static class SqlAlterMaterializedTableModifySchemaConverter
+            extends SqlAlterMaterializedTableSchemaConverter<
+                    SqlAlterMaterializedTableModifySchema> {
+        @Override
+        protected SchemaConverter createSchemaConverter(
+                ResolvedCatalogMaterializedTable oldMaterializedTable, 
ConvertContext context) {
+            return new SchemaModifyConverter(oldMaterializedTable, context);
+        }
+
+        @Override
+        protected void validateChanges(
+                ResolvedSchema oldSchema, Schema newSchema, ConvertContext 
context) {
+            Map<String, Column> map = new HashMap<>();
+            for (int i = 0; i < oldSchema.getColumnCount(); i++) {
+                final Column column = oldSchema.getColumn(i).get();
+                map.put(column.getName(), column);
+            }
+
+            List<Schema.UnresolvedColumn> columns = newSchema.getColumns();

Review Comment:
   ```suggestion
               List<UnresolvedColumn> columns = newSchema.getColumns();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to