spena commented on code in PR #25359:
URL: https://github.com/apache/flink/pull/25359#discussion_r1768799456


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java:
##########
@@ -209,6 +213,78 @@ public static RelNode convertSinkToRel(
                 sink);
     }
 
+    /**
+     * Converts a given {@link DynamicTableSink} to a {@link RelNode}. It adds 
helper projections if
+     * necessary.
+     */
+    public static RelNode convertCtasToRel(
+            FlinkRelBuilder relBuilder,
+            RelNode input,
+            Catalog catalog,
+            CreateTableASOperation createTableASOperation,
+            DynamicTableSink sink) {
+        final ResolvedCatalogTable catalogTable =
+                (ResolvedCatalogTable)
+                        
createTableASOperation.getCreateTableOperation().getCatalogTable();
+
+        final ObjectIdentifier identifier =
+                
createTableASOperation.getCreateTableOperation().getTableIdentifier();
+
+        final ContextResolvedTable contextResolvedTable;
+        if (createTableASOperation.getCreateTableOperation().isTemporary()) {
+            contextResolvedTable = ContextResolvedTable.temporary(identifier, 
catalogTable);
+        } else {
+            contextResolvedTable =
+                    ContextResolvedTable.permanent(identifier, catalog, 
catalogTable);
+        }
+
+        return convertSinkToRel(
+                relBuilder,
+                input,
+                Collections.emptyMap(),
+                contextResolvedTable,
+                createTableASOperation.getSinkModifyStaticPartitions(),
+                null,
+                createTableASOperation.getSinkModifyOverwrite(),
+                sink);
+    }
+
+    /**
+     * Converts a given {@link DynamicTableSink} to a {@link RelNode}. It adds 
helper projections if
+     * necessary.
+     */
+    public static RelNode convertRtasToRel(
+            FlinkRelBuilder relBuilder,
+            RelNode input,
+            Catalog catalog,
+            ReplaceTableAsOperation replaceTableASOperation,
+            DynamicTableSink sink) {
+        final ResolvedCatalogTable catalogTable =
+                (ResolvedCatalogTable)
+                        
replaceTableASOperation.getCreateTableOperation().getCatalogTable();
+
+        final ObjectIdentifier identifier =
+                
replaceTableASOperation.getCreateTableOperation().getTableIdentifier();
+
+        final ContextResolvedTable contextResolvedTable;
+        if (replaceTableASOperation.getCreateTableOperation().isTemporary()) {

Review Comment:
   Done



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java:
##########
@@ -658,6 +659,37 @@ public void testCreateTableInvalidDistribution() {
                         "Invalid bucket key 'f3'. A bucket key for a 
distribution must reference a physical column in the schema. Available columns 
are: [a]");
     }
 
+    @Test
+    public void testExplainCreateTableAs() {

Review Comment:
   Done



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java:
##########
@@ -61,6 +62,27 @@ public void testReplaceTableAs() {
         testCommonReplaceTableAs(sql, tableName, tableComment);
     }
 
+    @Test
+    public void testExplainCreateOrReplaceTableAs() {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to