snuyanzin commented on code in PR #27327:
URL: https://github.com/apache/flink/pull/27327#discussion_r2618860742


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableSchemaConverter.java:
##########
@@ -66,5 +83,90 @@ protected SchemaConverter createSchemaConverter(
                 ResolvedCatalogMaterializedTable oldTable, ConvertContext 
context) {
             return new SchemaAddConverter(oldTable, context);
         }
+
+        @Override
+        protected void validateChanges(
+                ResolvedSchema oldSchema, Schema newSchema, ConvertContext 
context) {}
+    }
+
+    /** A converter for {@link SqlAlterMaterializedTableModifySchema}. */
+    public static class SqlAlterMaterializedTableModifySchemaConverter
+            extends SqlAlterMaterializedTableSchemaConverter<
+                    SqlAlterMaterializedTableModifySchema> {
+        @Override
+        protected SchemaConverter createSchemaConverter(
+                ResolvedCatalogMaterializedTable oldMaterializedTable, 
ConvertContext context) {
+            return new SchemaModifyConverter(oldMaterializedTable, context);
+        }
+
+        @Override
+        protected void validateChanges(
+                ResolvedSchema oldSchema, Schema newSchema, ConvertContext 
context) {
+            Map<String, Column> map = new HashMap<>();
+            for (int i = 0; i < oldSchema.getColumnCount(); i++) {
+                final Column column = oldSchema.getColumn(i).get();
+                map.put(column.getName(), column);
+            }
+
+            List<Schema.UnresolvedColumn> columns = newSchema.getColumns();
+            for (int i = 0; i < columns.size(); i++) {
+                Schema.UnresolvedColumn col = columns.get(i);
+                final String name = col.getName();
+                if (map.containsKey(name) && !columnTypeKept(col, 
map.get(name))) {
+                    throw new ValidationException(
+                            String.format(
+                                    "Failed to execute ALTER MATERIALIZED 
TABLE statement.\n"
+                                            + "Changing of %s column '%s' to 
%s column is not supported.",
+                                    getColumnKind(map.get(name).getClass()),
+                                    name,
+                                    getColumnKind(col.getClass())));
+                }
+                if (col instanceof Schema.UnresolvedComputedColumn) {
+                    continue;
+                }
+
+                LogicalType dataType =
+                        
createDataType(context.getCatalogManager().getDataTypeFactory(), col);
+                LogicalType oldDataType = 
map.get(col.getName()).getDataType().getLogicalType();
+                if (!LogicalTypeCasts.supportsImplicitCast(oldDataType, 
dataType)) {

Review Comment:
   added a comment referencing code for `CREATE [MATERIALIZED ] TABLE` path 
with similar logic



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to