Hi zoucao, I think this topic deserves a proper FLIP and a vote. This approach is focussed only on Hive, but I would also like to understand the implications for FileSource. Can you create one?
Best regards, Martijn Op wo 22 jun. 2022 om 18:50 schreef cao zou <zoucao...@gmail.com>: > Hi devs, I want to start a discussion to find a way to support partition > pruning for streaming reading. > > > Now, Flink has supported the partition pruning, the implementation consists > of *Source Ability*, *Logical Rule*, and the interface > *SupportsPartitionPushDown*, but they all only take effect in batch > reading. When reading a table in streaming mode, the existing mechanism > will cause some problems posted by FLINK-27898 > <https://issues.apache.org/jira/browse/FLINK-27898>[1], and the records > that should be filtered will be sent downstream. > > To solve this drawback, this discussion is proposed, and the Hive and other > BigData systems stored with partitions will benefit more from it. > > Now, the existing partitions which are needed to consume will be generated > in *PushPartitionIntoTableSourceScanRule*. Then, the partitions will be > pushed into TableSource. It’s working well in batch mode, but if we want to > read records from Hive in streaming mode, and consider the partitions > committed in the future, it’s not enough. > > To support pruning the partitions committed in the feature, the pruning > function should be pushed into the TableSource, and then delivered to > *ContinuousPartitionFetcher*, such that the pruning for uncommitted > partitions can be invoked here. > > Before proposing the changes, I think it is necessary to clarify the > existing pruning logic. The main logic of the pruning in > *PushPartitionIntoTableSourceScanRule* is as follows. > > Firstly, generating a pruning function called partitionPruner, the function > is extended from a RichMapFunction<GenericRowData, Boolean>. > > > if tableSource.listPartitions() is not empty: > partitions = dynamicTableSource.listPartitions() > > for p in partitions: > boolean predicate = partitionPruner.map(convertPartitionToRow(p)) > > add p to partitionsAfterPruning where the predicate is true. > > else tableSource.listPartitions() is empty: > if the filter can be converted to ResolvedExpression && > the catalog can support the filter : > > partitionsAfterPruning = catalog.listPartitionsByFilter() > > the value of partitionsAfterPruning is all needed. > else : > > partitions = catalog.listPartitions() > for p in partitions: > boolean predicate = partitionPruner.map(convertPartitionToRow(p)) > > add p to partitionsAfterPruning where the predicate is true. > > I think the main logic can be classified into two sides, one exists in the > logical rule, and the other exists in the connector side. The catalog info > should be used on the rule side, and not on the connector side, the pruning > function could be used on both of them or unified on the connector side. > > > Proposed changes > > > - add a new method in SupportsPartitionPushDown > - let HiveSourceTable, HiveSourceBuilder, and > HiveContinuousPartitionFetcher hold the pruning function. > - pruning after fetchPartitions invoked. > > Considering the version compatibility and the optimization for the method > of listing partitions with filter in the catalog, I think we can add a new > method in *SupportsPartitionPushDown* > > /** > * Provides a list of remaining partitions. After those partitions are > applied, a source must > * not read the data of other partitions during runtime. > * > * <p>See the documentation of {@link SupportsPartitionPushDown} for more > information. > */ > void applyPartitions(List<Map<String, String>> remainingPartitions); > > /** > * Provides a pruning function for uncommitted partitions. > */ > default void applyPartitionPuringFunction(MapFunction<RowData, Boolean> > partitionPruningFunction) { } > > We can push the generated function into TableSource, such that the > ContinuousPartitionFetcher can get it. > > For Batch reading, the 'remainingPartitions' will be seen as the partitions > needed to consume, for streaming reading, we use the > 'partitionPruningFunction' to ignore the unneeded partitions. > Rejected Alternatives > > Do not remove the filter logic in Filter Node about the partition keys, if > the source will execute streaming reading. > > > Looking forward to your opinions. > > > [1] https://issues.apache.org/jira/browse/FLINK-27898 > > best > > zoucao >