Hi devs, I want to start a discussion to find a way to support partition pruning for streaming reading.
Now, Flink has supported the partition pruning, the implementation consists of *Source Ability*, *Logical Rule*, and the interface *SupportsPartitionPushDown*, but they all only take effect in batch reading. When reading a table in streaming mode, the existing mechanism will cause some problems posted by FLINK-27898 <https://issues.apache.org/jira/browse/FLINK-27898>[1], and the records that should be filtered will be sent downstream. To solve this drawback, this discussion is proposed, and the Hive and other BigData systems stored with partitions will benefit more from it. Now, the existing partitions which are needed to consume will be generated in *PushPartitionIntoTableSourceScanRule*. Then, the partitions will be pushed into TableSource. It’s working well in batch mode, but if we want to read records from Hive in streaming mode, and consider the partitions committed in the future, it’s not enough. To support pruning the partitions committed in the feature, the pruning function should be pushed into the TableSource, and then delivered to *ContinuousPartitionFetcher*, such that the pruning for uncommitted partitions can be invoked here. Before proposing the changes, I think it is necessary to clarify the existing pruning logic. The main logic of the pruning in *PushPartitionIntoTableSourceScanRule* is as follows. Firstly, generating a pruning function called partitionPruner, the function is extended from a RichMapFunction<GenericRowData, Boolean>. if tableSource.listPartitions() is not empty: partitions = dynamicTableSource.listPartitions() for p in partitions: boolean predicate = partitionPruner.map(convertPartitionToRow(p)) add p to partitionsAfterPruning where the predicate is true. else tableSource.listPartitions() is empty: if the filter can be converted to ResolvedExpression && the catalog can support the filter : partitionsAfterPruning = catalog.listPartitionsByFilter() the value of partitionsAfterPruning is all needed. else : partitions = catalog.listPartitions() for p in partitions: boolean predicate = partitionPruner.map(convertPartitionToRow(p)) add p to partitionsAfterPruning where the predicate is true. I think the main logic can be classified into two sides, one exists in the logical rule, and the other exists in the connector side. The catalog info should be used on the rule side, and not on the connector side, the pruning function could be used on both of them or unified on the connector side. Proposed changes - add a new method in SupportsPartitionPushDown - let HiveSourceTable, HiveSourceBuilder, and HiveContinuousPartitionFetcher hold the pruning function. - pruning after fetchPartitions invoked. Considering the version compatibility and the optimization for the method of listing partitions with filter in the catalog, I think we can add a new method in *SupportsPartitionPushDown* /** * Provides a list of remaining partitions. After those partitions are applied, a source must * not read the data of other partitions during runtime. * * <p>See the documentation of {@link SupportsPartitionPushDown} for more information. */ void applyPartitions(List<Map<String, String>> remainingPartitions); /** * Provides a pruning function for uncommitted partitions. */ default void applyPartitionPuringFunction(MapFunction<RowData, Boolean> partitionPruningFunction) { } We can push the generated function into TableSource, such that the ContinuousPartitionFetcher can get it. For Batch reading, the 'remainingPartitions' will be seen as the partitions needed to consume, for streaming reading, we use the 'partitionPruningFunction' to ignore the unneeded partitions. Rejected Alternatives Do not remove the filter logic in Filter Node about the partition keys, if the source will execute streaming reading. Looking forward to your opinions. [1] https://issues.apache.org/jira/browse/FLINK-27898 best zoucao