spena commented on code in PR #25359: URL: https://github.com/apache/flink/pull/25359#discussion_r1768799456
########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java: ########## @@ -209,6 +213,78 @@ public static RelNode convertSinkToRel( sink); } + /** + * Converts a given {@link DynamicTableSink} to a {@link RelNode}. It adds helper projections if + * necessary. + */ + public static RelNode convertCtasToRel( + FlinkRelBuilder relBuilder, + RelNode input, + Catalog catalog, + CreateTableASOperation createTableASOperation, + DynamicTableSink sink) { + final ResolvedCatalogTable catalogTable = + (ResolvedCatalogTable) + createTableASOperation.getCreateTableOperation().getCatalogTable(); + + final ObjectIdentifier identifier = + createTableASOperation.getCreateTableOperation().getTableIdentifier(); + + final ContextResolvedTable contextResolvedTable; + if (createTableASOperation.getCreateTableOperation().isTemporary()) { + contextResolvedTable = ContextResolvedTable.temporary(identifier, catalogTable); + } else { + contextResolvedTable = + ContextResolvedTable.permanent(identifier, catalog, catalogTable); + } + + return convertSinkToRel( + relBuilder, + input, + Collections.emptyMap(), + contextResolvedTable, + createTableASOperation.getSinkModifyStaticPartitions(), + null, + createTableASOperation.getSinkModifyOverwrite(), + sink); + } + + /** + * Converts a given {@link DynamicTableSink} to a {@link RelNode}. It adds helper projections if + * necessary. + */ + public static RelNode convertRtasToRel( + FlinkRelBuilder relBuilder, + RelNode input, + Catalog catalog, + ReplaceTableAsOperation replaceTableASOperation, + DynamicTableSink sink) { + final ResolvedCatalogTable catalogTable = + (ResolvedCatalogTable) + replaceTableASOperation.getCreateTableOperation().getCatalogTable(); + + final ObjectIdentifier identifier = + replaceTableASOperation.getCreateTableOperation().getTableIdentifier(); + + final ContextResolvedTable contextResolvedTable; + if (replaceTableASOperation.getCreateTableOperation().isTemporary()) { Review Comment: Done ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java: ########## @@ -658,6 +659,37 @@ public void testCreateTableInvalidDistribution() { "Invalid bucket key 'f3'. A bucket key for a distribution must reference a physical column in the schema. Available columns are: [a]"); } + @Test + public void testExplainCreateTableAs() { Review Comment: Done ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java: ########## @@ -61,6 +62,27 @@ public void testReplaceTableAs() { testCommonReplaceTableAs(sql, tableName, tableComment); } + @Test + public void testExplainCreateOrReplaceTableAs() { Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org