twalthr commented on code in PR #25359:
URL: https://github.com/apache/flink/pull/25359#discussion_r1768388152


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java:
##########
@@ -209,6 +213,78 @@ public static RelNode convertSinkToRel(
                 sink);
     }
 
+    /**
+     * Converts a given {@link DynamicTableSink} to a {@link RelNode}. It adds 
helper projections if
+     * necessary.
+     */
+    public static RelNode convertCtasToRel(

Review Comment:
   ```suggestion
       public static RelNode convertCreateTableAsToRel(
   ```
   avoid abbreviations, we cannot assume that everybody knows them



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java:
##########
@@ -209,6 +213,78 @@ public static RelNode convertSinkToRel(
                 sink);
     }
 
+    /**
+     * Converts a given {@link DynamicTableSink} to a {@link RelNode}. It adds 
helper projections if
+     * necessary.
+     */
+    public static RelNode convertCtasToRel(
+            FlinkRelBuilder relBuilder,
+            RelNode input,
+            Catalog catalog,
+            CreateTableASOperation createTableASOperation,
+            DynamicTableSink sink) {
+        final ResolvedCatalogTable catalogTable =
+                (ResolvedCatalogTable)
+                        
createTableASOperation.getCreateTableOperation().getCatalogTable();
+
+        final ObjectIdentifier identifier =
+                
createTableASOperation.getCreateTableOperation().getTableIdentifier();
+
+        final ContextResolvedTable contextResolvedTable;
+        if (createTableASOperation.getCreateTableOperation().isTemporary()) {
+            contextResolvedTable = ContextResolvedTable.temporary(identifier, 
catalogTable);
+        } else {
+            contextResolvedTable =
+                    ContextResolvedTable.permanent(identifier, catalog, 
catalogTable);
+        }
+
+        return convertSinkToRel(
+                relBuilder,
+                input,
+                Collections.emptyMap(),
+                contextResolvedTable,
+                createTableASOperation.getSinkModifyStaticPartitions(),
+                null,
+                createTableASOperation.getSinkModifyOverwrite(),
+                sink);
+    }
+
+    /**
+     * Converts a given {@link DynamicTableSink} to a {@link RelNode}. It adds 
helper projections if
+     * necessary.
+     */
+    public static RelNode convertRtasToRel(
+            FlinkRelBuilder relBuilder,
+            RelNode input,
+            Catalog catalog,
+            ReplaceTableAsOperation replaceTableASOperation,
+            DynamicTableSink sink) {
+        final ResolvedCatalogTable catalogTable =
+                (ResolvedCatalogTable)
+                        
replaceTableASOperation.getCreateTableOperation().getCatalogTable();
+
+        final ObjectIdentifier identifier =
+                
replaceTableASOperation.getCreateTableOperation().getTableIdentifier();
+
+        final ContextResolvedTable contextResolvedTable;
+        if (replaceTableASOperation.getCreateTableOperation().isTemporary()) {

Review Comment:
   pass `replaceTableASOperation.getCreateTableOperation()` as a parameter and 
merge `convertRtasToRel` and `convertCtasToRel` to reduce code duplication



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java:
##########
@@ -658,6 +659,37 @@ public void testCreateTableInvalidDistribution() {
                         "Invalid bucket key 'f3'. A bucket key for a 
distribution must reference a physical column in the schema. Available columns 
are: [a]");
     }
 
+    @Test
+    public void testExplainCreateTableAs() {

Review Comment:
   I don't think we need this test. The explain test below that outputs the 
full pipeline should be sufficient.



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java:
##########
@@ -61,6 +62,27 @@ public void testReplaceTableAs() {
         testCommonReplaceTableAs(sql, tableName, tableComment);
     }
 
+    @Test
+    public void testExplainCreateOrReplaceTableAs() {

Review Comment:
   I don't think we need this test. The explain test below that outputs the 
full pipeline should be sufficient.



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala:
##########
@@ -268,6 +268,50 @@ abstract class PlannerBase(
           getFlinkContext.getClassLoader
         )
 
+      case ctasOperation: CreateTableASOperation =>

Review Comment:
   can we dedup more logic? in the end RTAS and CTAS are almost the same. They 
only differ in constructing one `getCreateTableOperation`?



##########
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java:
##########
@@ -41,6 +41,10 @@ public interface ParserResource {
     @Resources.BaseMessage("Duplicate EXPLAIN DETAIL is not allowed.")
     Resources.ExInst<ParseException> explainDetailIsDuplicate();
 
+    @Resources.BaseMessage(
+            "CREATE OR REPLACE statement not supported by EXPLAIN. Only 
CTAS/RTAS statements are supported.")

Review Comment:
   ```suggestion
               "Unsupported CREATE OR REPLACE statement for EXPLAIN. The 
statement must define a query using the AS clause (i.e. CTAS/RTAS statements).")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to