hackergin commented on code in PR #25880: URL: https://github.com/apache/flink/pull/25880#discussion_r1906992956
########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java: ########## @@ -391,6 +451,127 @@ void testAlterMaterializedTableResume() { .isEqualTo("ALTER MATERIALIZED TABLE builtin.default.mtbl1 RESUME WITH (k1: [v1])"); } + @Test + void testAlterMaterializedTableAsQuery() throws TableNotExistException { + String sql = + "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b, c, d, d as e, cast('123' as string) as f FROM t3"; + Operation operation = parse(sql); + + assertThat(operation).isInstanceOf(AlterMaterializedTableAsQueryOperation.class); + + AlterMaterializedTableAsQueryOperation op = + (AlterMaterializedTableAsQueryOperation) operation; + assertThat(op.getTableChanges()) + .isEqualTo( + Arrays.asList( + TableChange.add( + Column.physical("e", DataTypes.VARCHAR(Integer.MAX_VALUE))), + TableChange.add( + Column.physical("f", DataTypes.VARCHAR(Integer.MAX_VALUE))), + TableChange.modifyDefinitionQuery( + "SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n" + + "FROM `builtin`.`default`.`t3`"))); + assertThat(operation.asSummaryString()) + .isEqualTo( + "ALTER MATERIALIZED TABLE builtin.default.base_mtbl AS SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n" + + "FROM `builtin`.`default`.`t3`"); + + // new table only difference schema & definition query with old table. + CatalogMaterializedTable oldTable = + (CatalogMaterializedTable) + catalog.getTable( + new ObjectPath(catalogManager.getCurrentDatabase(), "base_mtbl")); + CatalogMaterializedTable newTable = op.getNewMaterializedTable(); + + assertThat(oldTable.getUnresolvedSchema()).isNotEqualTo(newTable.getUnresolvedSchema()); + assertThat(oldTable.getUnresolvedSchema().getPrimaryKey()) + .isEqualTo(newTable.getUnresolvedSchema().getPrimaryKey()); + assertThat(oldTable.getUnresolvedSchema().getWatermarkSpecs()) + .isEqualTo(newTable.getUnresolvedSchema().getWatermarkSpecs()); + assertThat(oldTable.getDefinitionQuery()).isNotEqualTo(newTable.getDefinitionQuery()); + assertThat(oldTable.getDefinitionFreshness()).isEqualTo(newTable.getDefinitionFreshness()); + assertThat(oldTable.getRefreshMode()).isEqualTo(newTable.getRefreshMode()); + assertThat(oldTable.getRefreshStatus()).isEqualTo(newTable.getRefreshStatus()); + assertThat(oldTable.getSerializedRefreshHandler()) + .isEqualTo(newTable.getSerializedRefreshHandler()); + + List<Schema.UnresolvedColumn> addedColumn = + newTable.getUnresolvedSchema().getColumns().stream() + .filter( + column -> + !oldTable.getUnresolvedSchema() + .getColumns() + .contains(column)) + .collect(Collectors.toList()); + // added column should be a nullable column. + assertThat(addedColumn) + .isEqualTo( + Arrays.asList( + new Schema.UnresolvedPhysicalColumn( + "e", DataTypes.VARCHAR(Integer.MAX_VALUE)), + new Schema.UnresolvedPhysicalColumn( + "f", DataTypes.VARCHAR(Integer.MAX_VALUE)))); + } + + @Test + void testAlterMaterializedTableAsQueryWithConflictColumnName() { + String sql5 = "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b, c, d, c as a FROM t3"; Review Comment: I created jira https://issues.apache.org/jira/browse/FLINK-37064 to track this issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org