lsyldliu commented on code in PR #20392:
URL: https://github.com/apache/flink/pull/20392#discussion_r934198540


##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala:
##########
@@ -94,4 +94,36 @@ class TableSinkITCase extends BatchTestBase {
     tEnv.getConfig.set(CollectSinkOperatorFactory.MAX_BATCH_SIZE, 
MemorySize.parse("1kb"))
     checkResult("SELECT 1", Seq(row(1)))
   }
+
+  @Test
+  def testCreateTableAsSelect(): Unit = {

Review Comment:
   Please also test the case when `connector` option is not specified. 
Moreover, wether can we add a test about managed table?



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala:
##########
@@ -192,4 +193,30 @@ class TableSinkITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase
     )
     assertEquals(expected.sorted, result.sorted)
   }
+
+  @Test
+  def testCreateTableAsSelect(): Unit = {

Review Comment:
   ditto



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala:
##########
@@ -192,4 +193,30 @@ class TableSinkITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase
     )
     assertEquals(expected.sorted, result.sorted)
   }
+
+  @Test
+  def testCreateTableAsSelect(): Unit = {
+    tEnv
+      .executeSql("""
+                    |CREATE TABLE MyCtasTable
+                    | WITH (
+                    |   'connector' = 'values',
+                    |   'sink-insert-only' = 'true'
+                    |) AS
+                    |  SELECT
+                    |    `person`,
+                    |    `votes`
+                    |  FROM
+                    |    src
+                    |""".stripMargin)
+      .await()
+    val actual = TestValuesTableFactory.getResults("MyCtasTable")
+    val expected = List(
+      "+I[jason, 1]",
+      "+I[jason, 1]",
+      "+I[jason, 1]",
+      "+I[jason, 1]"
+    )
+    Assertions.assertThat(actual.sorted).isEqualTo(expected.sorted)

Review Comment:
   Why not follow other test, using `Assert.assertEquals(expected.sorted, 
result1.sorted)` directly?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java:
##########
@@ -82,6 +87,60 @@ Operation convertCreateTable(SqlCreateTable sqlCreateTable) {
                 sqlCreateTable.isTemporary());
     }
 
+    /** Convert the {@link SqlCreateTableAs} node. */
+    Operation convertCreateTableAS(FlinkPlannerImpl flinkPlanner, 
SqlCreateTableAs sqlCreateTable) {
+        sqlCreateTable.getTableConstraints().forEach(validateTableConstraint);
+
+        UnresolvedIdentifier unresolvedIdentifier =
+                UnresolvedIdentifier.of(sqlCreateTable.fullTableName());
+        ObjectIdentifier identifier = 
catalogManager.qualifyIdentifier(unresolvedIdentifier);
+
+        PlannerQueryOperation query =
+                (PlannerQueryOperation)
+                        SqlToOperationConverter.convert(
+                                        flinkPlanner, catalogManager, 
sqlCreateTable.getAsQuery())
+                                .orElseThrow(
+                                        () ->
+                                                new TableException(
+                                                        "CTAS Unsupported node 
type "

Review Comment:
   ```suggestion
                                                           "CTAS unsupported 
node type "
   ```



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java:
##########
@@ -82,6 +87,60 @@ Operation convertCreateTable(SqlCreateTable sqlCreateTable) {
                 sqlCreateTable.isTemporary());
     }
 
+    /** Convert the {@link SqlCreateTableAs} node. */
+    Operation convertCreateTableAS(FlinkPlannerImpl flinkPlanner, 
SqlCreateTableAs sqlCreateTable) {
+        sqlCreateTable.getTableConstraints().forEach(validateTableConstraint);
+
+        UnresolvedIdentifier unresolvedIdentifier =
+                UnresolvedIdentifier.of(sqlCreateTable.fullTableName());
+        ObjectIdentifier identifier = 
catalogManager.qualifyIdentifier(unresolvedIdentifier);
+
+        PlannerQueryOperation query =
+                (PlannerQueryOperation)
+                        SqlToOperationConverter.convert(
+                                        flinkPlanner, catalogManager, 
sqlCreateTable.getAsQuery())
+                                .orElseThrow(
+                                        () ->
+                                                new TableException(
+                                                        "CTAS Unsupported node 
type "
+                                                                + 
sqlCreateTable
+                                                                        
.getAsQuery()
+                                                                        
.getClass()
+                                                                        
.getSimpleName()));
+        Map<String, String> properties = new HashMap<>();

Review Comment:
   Here we can reuse the `createCatalogTable` method.
   ```
   UnresolvedIdentifier unresolvedIdentifier =
                   UnresolvedIdentifier.of(sqlCreateTable.fullTableName());
   ObjectIdentifier identifier = 
catalogManager.qualifyIdentifier(unresolvedIdentifier);
   CatalogTable catalogTable = createCatalogTable(sqlCreateTable);
   
   CreateTableOperation createTableOperation =
                   new CreateTableOperation(
                           identifier,
                        
CatalogTable.of(Schema.newBuilder().fromResolvedSchema(query.getResolvedSchema()).build(),
                                   catalogTable.getComment(),
                                   catalogTable.getPartitionKeys(),
                                   catalogTable.getOptions()),
                           sqlCreateTable.isIfNotExists(),
                           sqlCreateTable.isTemporary());
   ```



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java:
##########
@@ -82,6 +87,60 @@ Operation convertCreateTable(SqlCreateTable sqlCreateTable) {
                 sqlCreateTable.isTemporary());
     }
 
+    /** Convert the {@link SqlCreateTableAs} node. */
+    Operation convertCreateTableAS(FlinkPlannerImpl flinkPlanner, 
SqlCreateTableAs sqlCreateTable) {
+        sqlCreateTable.getTableConstraints().forEach(validateTableConstraint);

Review Comment:
   This validation is no need, we have validated it in 
`SqlCreateTableAs#validate` method.



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala:
##########
@@ -94,4 +94,36 @@ class TableSinkITCase extends BatchTestBase {
     tEnv.getConfig.set(CollectSinkOperatorFactory.MAX_BATCH_SIZE, 
MemorySize.parse("1kb"))
     checkResult("SELECT 1", Seq(row(1)))
   }
+

Review Comment:
   Please also add plan related tests in `TableSinkTest`. And also consider 
test the case when `connector` option is not specified. Moreover, wether can we 
add a test about managed table?



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala:
##########
@@ -94,4 +94,36 @@ class TableSinkITCase extends BatchTestBase {
     tEnv.getConfig.set(CollectSinkOperatorFactory.MAX_BATCH_SIZE, 
MemorySize.parse("1kb"))
     checkResult("SELECT 1", Seq(row(1)))
   }
+
+  @Test
+  def testCreateTableAsSelect(): Unit = {
+    val dataId = TestValuesTableFactory.registerData(smallData3)
+    tEnv.executeSql(s"""
+                       |CREATE TABLE MyTable (
+                       |  `a` INT,
+                       |  `b` BIGINT,
+                       |  `c` STRING
+                       |) WITH (
+                       |  'connector' = 'values',
+                       |  'bounded' = 'true',
+                       |  'data-id' = '$dataId'
+                       |)
+       """.stripMargin)
+
+    val resultPath = 
BatchAbstractTestBase.TEMPORARY_FOLDER.newFolder().getAbsolutePath
+    tEnv
+      .executeSql(s"""
+                     |CREATE TABLE MyCtasTable
+                     | WITH (
+                     |  'connector' = 'filesystem',
+                     |  'format' = 'testcsv',
+                     |  'path' = '$resultPath'
+                     |) AS
+                     | SELECT * FROM MyTable
+       """.stripMargin)
+      .await()
+    val expected = Seq("1,1,Hi", "2,2,Hello", "3,2,Hello world")
+    val result = TableTestUtil.readFromFile(resultPath)
+    Assertions.assertThat(result.sorted).isEqualTo(expected.sorted)

Review Comment:
   Why not follow other test, using `Assert.assertEquals(expected.sorted, 
result1.sorted)` directly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to