hackergin commented on code in PR #25880:
URL: https://github.com/apache/flink/pull/25880#discussion_r1906992956


##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java:
##########
@@ -391,6 +451,127 @@ void testAlterMaterializedTableResume() {
                 .isEqualTo("ALTER MATERIALIZED TABLE builtin.default.mtbl1 
RESUME WITH (k1: [v1])");
     }
 
+    @Test
+    void testAlterMaterializedTableAsQuery() throws TableNotExistException {
+        String sql =
+                "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b, c, d, d as 
e, cast('123' as string) as f FROM t3";
+        Operation operation = parse(sql);
+
+        
assertThat(operation).isInstanceOf(AlterMaterializedTableAsQueryOperation.class);
+
+        AlterMaterializedTableAsQueryOperation op =
+                (AlterMaterializedTableAsQueryOperation) operation;
+        assertThat(op.getTableChanges())
+                .isEqualTo(
+                        Arrays.asList(
+                                TableChange.add(
+                                        Column.physical("e", 
DataTypes.VARCHAR(Integer.MAX_VALUE))),
+                                TableChange.add(
+                                        Column.physical("f", 
DataTypes.VARCHAR(Integer.MAX_VALUE))),
+                                TableChange.modifyDefinitionQuery(
+                                        "SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, 
`t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n"
+                                                + "FROM 
`builtin`.`default`.`t3`")));
+        assertThat(operation.asSummaryString())
+                .isEqualTo(
+                        "ALTER MATERIALIZED TABLE builtin.default.base_mtbl AS 
SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS 
STRING) AS `f`\n"
+                                + "FROM `builtin`.`default`.`t3`");
+
+        // new table only difference schema & definition query with old table.
+        CatalogMaterializedTable oldTable =
+                (CatalogMaterializedTable)
+                        catalog.getTable(
+                                new 
ObjectPath(catalogManager.getCurrentDatabase(), "base_mtbl"));
+        CatalogMaterializedTable newTable = op.getNewMaterializedTable();
+
+        
assertThat(oldTable.getUnresolvedSchema()).isNotEqualTo(newTable.getUnresolvedSchema());
+        assertThat(oldTable.getUnresolvedSchema().getPrimaryKey())
+                .isEqualTo(newTable.getUnresolvedSchema().getPrimaryKey());
+        assertThat(oldTable.getUnresolvedSchema().getWatermarkSpecs())
+                .isEqualTo(newTable.getUnresolvedSchema().getWatermarkSpecs());
+        
assertThat(oldTable.getDefinitionQuery()).isNotEqualTo(newTable.getDefinitionQuery());
+        
assertThat(oldTable.getDefinitionFreshness()).isEqualTo(newTable.getDefinitionFreshness());
+        
assertThat(oldTable.getRefreshMode()).isEqualTo(newTable.getRefreshMode());
+        
assertThat(oldTable.getRefreshStatus()).isEqualTo(newTable.getRefreshStatus());
+        assertThat(oldTable.getSerializedRefreshHandler())
+                .isEqualTo(newTable.getSerializedRefreshHandler());
+
+        List<Schema.UnresolvedColumn> addedColumn =
+                newTable.getUnresolvedSchema().getColumns().stream()
+                        .filter(
+                                column ->
+                                        !oldTable.getUnresolvedSchema()
+                                                .getColumns()
+                                                .contains(column))
+                        .collect(Collectors.toList());
+        // added column should be a nullable column.
+        assertThat(addedColumn)
+                .isEqualTo(
+                        Arrays.asList(
+                                new Schema.UnresolvedPhysicalColumn(
+                                        "e", 
DataTypes.VARCHAR(Integer.MAX_VALUE)),
+                                new Schema.UnresolvedPhysicalColumn(
+                                        "f", 
DataTypes.VARCHAR(Integer.MAX_VALUE))));
+    }
+
+    @Test
+    void testAlterMaterializedTableAsQueryWithConflictColumnName() {
+        String sql5 = "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b, c, 
d, c as a FROM t3";

Review Comment:
   I created jira https://issues.apache.org/jira/browse/FLINK-37064 to track 
this issue. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to