JingsongLi commented on a change in pull request #18394: URL: https://github.com/apache/flink/pull/18394#discussion_r787387670
########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ManagedTableFactory.java ########## @@ -50,6 +51,13 @@ default String factoryIdentifier() { /** Notifies the listener that a table drop occurred. */ void onDropTable(Context context, boolean ignoreIfNotExists); + /** + * Notifies the listener that a table compaction occurred. + * + * @return dynamic options of the file entries under compaction for this table. Review comment: dynamic options for source and sink? there is no file concept in flink-sql ########## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelBuilder.scala ########## @@ -186,6 +195,44 @@ class FlinkRelBuilder( push(relNode) this } + + def compactScan( + identifier: ObjectIdentifier, + catalogTable: ResolvedCatalogTable, + isTemporary: Boolean, + compactOptions: util.Map[String, String]): RelBuilder = { + val flinkContext = context.unwrap(classOf[FlinkContext]) + val config = flinkContext.getTableConfig.getConfiguration + + val hints = new util.ArrayList[RelHint] Review comment: Can you just use `CatalogSourceTable` from `FlinkCalciteCatalogReader`? (You can refer to `RelBuilder`) Then you can create a `ToRelContext` for it. ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableCompactOperation.java ########## @@ -18,41 +18,32 @@ package org.apache.flink.table.operations.ddl; -import org.apache.flink.table.catalog.CatalogPartitionSpec; import org.apache.flink.table.catalog.ObjectIdentifier; -import org.apache.flink.table.operations.OperationUtils; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.operations.CatalogQueryOperation; -import javax.annotation.Nullable; - -import java.util.Collections; -import java.util.LinkedHashMap; import java.util.Map; /** Operation to describe "ALTER TABLE [PARTITION partition_spec] COMPACT" statement. */ -public class AlterTableCompactOperation extends AlterTableOperation { +public class AlterTableCompactOperation extends CatalogQueryOperation { - private final CatalogPartitionSpec partitionSpec; + private final ResolvedCatalogTable resolvedManagedTable; + private final Map<String, String> compactOptions; public AlterTableCompactOperation( Review comment: Maybe we can drop this class, just add `dynamicOptions` to `CatalogQueryOperation`. ########## File path: flink-formats/flink-orc/pom.xml ########## @@ -145,6 +145,19 @@ under the License. <scope>test</scope> <type>test-jar</type> </dependency> + <dependency> Review comment: Why modify orc parquet python sql-client? ########## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelBuilder.scala ########## @@ -186,6 +195,44 @@ class FlinkRelBuilder( push(relNode) this } + + def compactScan( Review comment: Maybe just `scan(ObjectIdentifier objectIdentifier, Map<String, String> dynamicOptions)`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org