lsyldliu commented on code in PR #20392: URL: https://github.com/apache/flink/pull/20392#discussion_r934198540
########## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala: ########## @@ -94,4 +94,36 @@ class TableSinkITCase extends BatchTestBase { tEnv.getConfig.set(CollectSinkOperatorFactory.MAX_BATCH_SIZE, MemorySize.parse("1kb")) checkResult("SELECT 1", Seq(row(1))) } + + @Test + def testCreateTableAsSelect(): Unit = { Review Comment: Please also test the case when `connector` option is not specified. Moreover, wether can we add a test about managed table? ########## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala: ########## @@ -192,4 +193,30 @@ class TableSinkITCase(mode: StateBackendMode) extends StreamingWithStateTestBase ) assertEquals(expected.sorted, result.sorted) } + + @Test + def testCreateTableAsSelect(): Unit = { Review Comment: ditto ########## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala: ########## @@ -192,4 +193,30 @@ class TableSinkITCase(mode: StateBackendMode) extends StreamingWithStateTestBase ) assertEquals(expected.sorted, result.sorted) } + + @Test + def testCreateTableAsSelect(): Unit = { + tEnv + .executeSql(""" + |CREATE TABLE MyCtasTable + | WITH ( + | 'connector' = 'values', + | 'sink-insert-only' = 'true' + |) AS + | SELECT + | `person`, + | `votes` + | FROM + | src + |""".stripMargin) + .await() + val actual = TestValuesTableFactory.getResults("MyCtasTable") + val expected = List( + "+I[jason, 1]", + "+I[jason, 1]", + "+I[jason, 1]", + "+I[jason, 1]" + ) + Assertions.assertThat(actual.sorted).isEqualTo(expected.sorted) Review Comment: Why not follow other test, using `Assert.assertEquals(expected.sorted, result1.sorted)` directly? ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java: ########## @@ -82,6 +87,60 @@ Operation convertCreateTable(SqlCreateTable sqlCreateTable) { sqlCreateTable.isTemporary()); } + /** Convert the {@link SqlCreateTableAs} node. */ + Operation convertCreateTableAS(FlinkPlannerImpl flinkPlanner, SqlCreateTableAs sqlCreateTable) { + sqlCreateTable.getTableConstraints().forEach(validateTableConstraint); + + UnresolvedIdentifier unresolvedIdentifier = + UnresolvedIdentifier.of(sqlCreateTable.fullTableName()); + ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier); + + PlannerQueryOperation query = + (PlannerQueryOperation) + SqlToOperationConverter.convert( + flinkPlanner, catalogManager, sqlCreateTable.getAsQuery()) + .orElseThrow( + () -> + new TableException( + "CTAS Unsupported node type " Review Comment: ```suggestion "CTAS unsupported node type " ``` ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java: ########## @@ -82,6 +87,60 @@ Operation convertCreateTable(SqlCreateTable sqlCreateTable) { sqlCreateTable.isTemporary()); } + /** Convert the {@link SqlCreateTableAs} node. */ + Operation convertCreateTableAS(FlinkPlannerImpl flinkPlanner, SqlCreateTableAs sqlCreateTable) { + sqlCreateTable.getTableConstraints().forEach(validateTableConstraint); + + UnresolvedIdentifier unresolvedIdentifier = + UnresolvedIdentifier.of(sqlCreateTable.fullTableName()); + ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier); + + PlannerQueryOperation query = + (PlannerQueryOperation) + SqlToOperationConverter.convert( + flinkPlanner, catalogManager, sqlCreateTable.getAsQuery()) + .orElseThrow( + () -> + new TableException( + "CTAS Unsupported node type " + + sqlCreateTable + .getAsQuery() + .getClass() + .getSimpleName())); + Map<String, String> properties = new HashMap<>(); Review Comment: Here we can reuse the `createCatalogTable` method. ``` UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(sqlCreateTable.fullTableName()); ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier); CatalogTable catalogTable = createCatalogTable(sqlCreateTable); CreateTableOperation createTableOperation = new CreateTableOperation( identifier, CatalogTable.of(Schema.newBuilder().fromResolvedSchema(query.getResolvedSchema()).build(), catalogTable.getComment(), catalogTable.getPartitionKeys(), catalogTable.getOptions()), sqlCreateTable.isIfNotExists(), sqlCreateTable.isTemporary()); ``` ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java: ########## @@ -82,6 +87,60 @@ Operation convertCreateTable(SqlCreateTable sqlCreateTable) { sqlCreateTable.isTemporary()); } + /** Convert the {@link SqlCreateTableAs} node. */ + Operation convertCreateTableAS(FlinkPlannerImpl flinkPlanner, SqlCreateTableAs sqlCreateTable) { + sqlCreateTable.getTableConstraints().forEach(validateTableConstraint); Review Comment: This validation is no need, we have validated it in `SqlCreateTableAs#validate` method. ########## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala: ########## @@ -94,4 +94,36 @@ class TableSinkITCase extends BatchTestBase { tEnv.getConfig.set(CollectSinkOperatorFactory.MAX_BATCH_SIZE, MemorySize.parse("1kb")) checkResult("SELECT 1", Seq(row(1))) } + Review Comment: Please also add plan related tests in `TableSinkTest`. And also consider test the case when `connector` option is not specified. Moreover, wether can we add a test about managed table? ########## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala: ########## @@ -94,4 +94,36 @@ class TableSinkITCase extends BatchTestBase { tEnv.getConfig.set(CollectSinkOperatorFactory.MAX_BATCH_SIZE, MemorySize.parse("1kb")) checkResult("SELECT 1", Seq(row(1))) } + + @Test + def testCreateTableAsSelect(): Unit = { + val dataId = TestValuesTableFactory.registerData(smallData3) + tEnv.executeSql(s""" + |CREATE TABLE MyTable ( + | `a` INT, + | `b` BIGINT, + | `c` STRING + |) WITH ( + | 'connector' = 'values', + | 'bounded' = 'true', + | 'data-id' = '$dataId' + |) + """.stripMargin) + + val resultPath = BatchAbstractTestBase.TEMPORARY_FOLDER.newFolder().getAbsolutePath + tEnv + .executeSql(s""" + |CREATE TABLE MyCtasTable + | WITH ( + | 'connector' = 'filesystem', + | 'format' = 'testcsv', + | 'path' = '$resultPath' + |) AS + | SELECT * FROM MyTable + """.stripMargin) + .await() + val expected = Seq("1,1,Hi", "2,2,Hello", "3,2,Hello world") + val result = TableTestUtil.readFromFile(resultPath) + Assertions.assertThat(result.sorted).isEqualTo(expected.sorted) Review Comment: Why not follow other test, using `Assert.assertEquals(expected.sorted, result1.sorted)` directly? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org