twalthr commented on code in PR #25359: URL: https://github.com/apache/flink/pull/25359#discussion_r1768388152
########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java: ########## @@ -209,6 +213,78 @@ public static RelNode convertSinkToRel( sink); } + /** + * Converts a given {@link DynamicTableSink} to a {@link RelNode}. It adds helper projections if + * necessary. + */ + public static RelNode convertCtasToRel( Review Comment: ```suggestion public static RelNode convertCreateTableAsToRel( ``` avoid abbreviations, we cannot assume that everybody knows them ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java: ########## @@ -209,6 +213,78 @@ public static RelNode convertSinkToRel( sink); } + /** + * Converts a given {@link DynamicTableSink} to a {@link RelNode}. It adds helper projections if + * necessary. + */ + public static RelNode convertCtasToRel( + FlinkRelBuilder relBuilder, + RelNode input, + Catalog catalog, + CreateTableASOperation createTableASOperation, + DynamicTableSink sink) { + final ResolvedCatalogTable catalogTable = + (ResolvedCatalogTable) + createTableASOperation.getCreateTableOperation().getCatalogTable(); + + final ObjectIdentifier identifier = + createTableASOperation.getCreateTableOperation().getTableIdentifier(); + + final ContextResolvedTable contextResolvedTable; + if (createTableASOperation.getCreateTableOperation().isTemporary()) { + contextResolvedTable = ContextResolvedTable.temporary(identifier, catalogTable); + } else { + contextResolvedTable = + ContextResolvedTable.permanent(identifier, catalog, catalogTable); + } + + return convertSinkToRel( + relBuilder, + input, + Collections.emptyMap(), + contextResolvedTable, + createTableASOperation.getSinkModifyStaticPartitions(), + null, + createTableASOperation.getSinkModifyOverwrite(), + sink); + } + + /** + * Converts a given {@link DynamicTableSink} to a {@link RelNode}. It adds helper projections if + * necessary. + */ + public static RelNode convertRtasToRel( + FlinkRelBuilder relBuilder, + RelNode input, + Catalog catalog, + ReplaceTableAsOperation replaceTableASOperation, + DynamicTableSink sink) { + final ResolvedCatalogTable catalogTable = + (ResolvedCatalogTable) + replaceTableASOperation.getCreateTableOperation().getCatalogTable(); + + final ObjectIdentifier identifier = + replaceTableASOperation.getCreateTableOperation().getTableIdentifier(); + + final ContextResolvedTable contextResolvedTable; + if (replaceTableASOperation.getCreateTableOperation().isTemporary()) { Review Comment: pass `replaceTableASOperation.getCreateTableOperation()` as a parameter and merge `convertRtasToRel` and `convertCtasToRel` to reduce code duplication ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java: ########## @@ -658,6 +659,37 @@ public void testCreateTableInvalidDistribution() { "Invalid bucket key 'f3'. A bucket key for a distribution must reference a physical column in the schema. Available columns are: [a]"); } + @Test + public void testExplainCreateTableAs() { Review Comment: I don't think we need this test. The explain test below that outputs the full pipeline should be sufficient. ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java: ########## @@ -61,6 +62,27 @@ public void testReplaceTableAs() { testCommonReplaceTableAs(sql, tableName, tableComment); } + @Test + public void testExplainCreateOrReplaceTableAs() { Review Comment: I don't think we need this test. The explain test below that outputs the full pipeline should be sufficient. ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala: ########## @@ -268,6 +268,50 @@ abstract class PlannerBase( getFlinkContext.getClassLoader ) + case ctasOperation: CreateTableASOperation => Review Comment: can we dedup more logic? in the end RTAS and CTAS are almost the same. They only differ in constructing one `getCreateTableOperation`? ########## flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java: ########## @@ -41,6 +41,10 @@ public interface ParserResource { @Resources.BaseMessage("Duplicate EXPLAIN DETAIL is not allowed.") Resources.ExInst<ParseException> explainDetailIsDuplicate(); + @Resources.BaseMessage( + "CREATE OR REPLACE statement not supported by EXPLAIN. Only CTAS/RTAS statements are supported.") Review Comment: ```suggestion "Unsupported CREATE OR REPLACE statement for EXPLAIN. The statement must define a query using the AS clause (i.e. CTAS/RTAS statements).") ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org