Hi all, ## SupportsParallelismReport
Now that FLIP-95 [1] is ready, only Hive and Filesystem are still using the old interfaces. We are considering migrating to the new interface. However, one problem is that in the old interface implementation, connectors infer parallelism by itself instead of a global parallelism configuration. Hive & filesystem determines the parallelism size according to the number of files and the size of the file. In this way, large tables may use thousands of parallelisms, while small tables only have 10 parallelisms, which can minimize the consumption of task scheduling. This situation is very common in batch computing. For example, in the star model, a large table needs to be joined with multiple small tables. So we should give this ability to new table source interfaces. The interface can be: /** * Enables to give source the ability to report parallelism. * * <p>After filtering push down and partition push down, the source can have more information, * which can help it infer more effective parallelism. */ @Internal public interface SupportsParallelismReport { /** * Report parallelism from source or sink. The parallelism of an operator must be at least 1, * or -1 (use system default). */ int reportParallelism(); } Rejected Alternatives: - SupportsSplitReport: What is the relationship between this split and the split of FLIP-27? Do we have to match them one by one? I think they are two independent things. In fact, the design of FLIP-27, split and parallelism are not bound one by one. - SupportsPartitionReport: What is partition? Actually, in table/SQL, partition is a special concept of table. It should not be mixed with parallelism. ## SupportsStatisticsReport As with parallelism, statistics information from source will be more appropriate and accurate. After filtering push down and partition push down, the source can have more information, which can help it infer more effective statistics. However, if we only infer from the planner itself, it may lead to a big gap between the statistics information and the real situation. The interface: /** * Enables to give {@link ScanTableSource} the ability to report table statistics. * * <p>Statistics can be inferred from real data in real time, it is more accurate than the * statistics in the catalog. * * <p>After filtering push down and partition push down, the source can have more information, * which can help it infer more effective table statistics. */ @Internal public interface SupportsStatisticsReport { /** * Reports {@link TableStats} from old table stats. */ TableStats reportTableStatistics(TableStats oldStats); } When to invoke reported statistics to the planner? - First of all, this call can be expensive (to view the metadata of the files), so it can't be called repeatedly. - We need to call after FilterPushdown, because that's the most accurate information. We also need to call before CBO (Like JoinReorder and choose BroadcastJoin or ShuffleJoin), because that's where statistics are used. Rejected Alternatives: - Using CatalogTableStatistics: CatalogTableStatistics or TableStats? I lean to TableStats, because TableStats is the class used by planner, but CatalogTableStatistics may contains some catalog information which is not related to planner optimizer. ## Internal or Public I personally lean to internal, these interfaces are only used Hive and Filesystem, another way is: SupportsParallelismReport(Internal, I haven't seen this requirement from outside.) and SupportsStatisticsReport(Public, maybe Apache Iceberg Flink connector can use it). What do you think? [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces Best, Jingsong Lee