Hi zoucao, The FileSource does support streaming reading [1].
Best regards, Martijn [1] https://nightlies.apache.org/flink/flink-docs-release-1.15/api/java/org/apache/flink/connector/file/src/FileSource.html Op ma 4 jul. 2022 om 05:58 schreef godfrey he <godfre...@gmail.com>: > Hi zoucao, > > Look forward your FLIP. > > >For Batch reading, the 'remainingPartitions' will be seen as the > partitions > >needed to consume, for streaming reading, we use the > >'partitionPruningFunction' to ignore the unneeded partitions. > There should be for bounded source(maybe batch or streaming), > `applyPartitions` should be used, > while only for unbounded source, `applyPartitionPuringFunction` can be > used. > > Best, > Godfrey > > cao zou <zoucao...@gmail.com> 于2022年7月4日周一 11:04写道: > > > > Hi Martijn, thanks for your attention, I'm glad to create a FLIP, and > could > > you help give me the permission? > > My Id is zoucao, and my mail is zoucao...@gmail.com. > > > > The implications for FileSource > > > > In the above discussion, only HiveSource has been involved, because it > > holds a continuous partition fetcher, but FileSource not. If we do the > > streaming pruning only in the partition fetcher, it will not affect the > > FileSource. If the FileSource supports streaming reading in the future, > the > > same changes can be applied to it. > > > > Best regards, > > zoucao > > > > Martijn Visser <martijnvis...@apache.org> 于2022年7月1日周五 16:20写道: > > > > > Hi zoucao, > > > > > > I think this topic deserves a proper FLIP and a vote. This approach is > > > focussed only on Hive, but I would also like to understand the > implications > > > for FileSource. Can you create one? > > > > > > Best regards, > > > > > > Martijn > > > > > > Op wo 22 jun. 2022 om 18:50 schreef cao zou <zoucao...@gmail.com>: > > > > > > > Hi devs, I want to start a discussion to find a way to support > partition > > > > pruning for streaming reading. > > > > > > > > > > > > Now, Flink has supported the partition pruning, the implementation > > > consists > > > > of *Source Ability*, *Logical Rule*, and the interface > > > > *SupportsPartitionPushDown*, but they all only take effect in batch > > > > reading. When reading a table in streaming mode, the existing > mechanism > > > > will cause some problems posted by FLINK-27898 > > > > <https://issues.apache.org/jira/browse/FLINK-27898>[1], and the > records > > > > that should be filtered will be sent downstream. > > > > > > > > To solve this drawback, this discussion is proposed, and the Hive and > > > other > > > > BigData systems stored with partitions will benefit more from it. > > > > > > > > Now, the existing partitions which are needed to consume will be > > > generated > > > > in *PushPartitionIntoTableSourceScanRule*. Then, the partitions will > be > > > > pushed into TableSource. It’s working well in batch mode, but if we > want > > > to > > > > read records from Hive in streaming mode, and consider the partitions > > > > committed in the future, it’s not enough. > > > > > > > > To support pruning the partitions committed in the feature, the > pruning > > > > function should be pushed into the TableSource, and then delivered to > > > > *ContinuousPartitionFetcher*, such that the pruning for uncommitted > > > > partitions can be invoked here. > > > > > > > > Before proposing the changes, I think it is necessary to clarify the > > > > existing pruning logic. The main logic of the pruning in > > > > *PushPartitionIntoTableSourceScanRule* is as follows. > > > > > > > > Firstly, generating a pruning function called partitionPruner, the > > > function > > > > is extended from a RichMapFunction<GenericRowData, Boolean>. > > > > > > > > > > > > if tableSource.listPartitions() is not empty: > > > > partitions = dynamicTableSource.listPartitions() > > > > > > > > for p in partitions: > > > > boolean predicate = partitionPruner.map(convertPartitionToRow(p)) > > > > > > > > add p to partitionsAfterPruning where the predicate is true. > > > > > > > > else tableSource.listPartitions() is empty: > > > > if the filter can be converted to ResolvedExpression && > > > > the catalog can support the filter : > > > > > > > > partitionsAfterPruning = catalog.listPartitionsByFilter() > > > > > > > > the value of partitionsAfterPruning is all needed. > > > > else : > > > > > > > > partitions = catalog.listPartitions() > > > > for p in partitions: > > > > boolean predicate = partitionPruner.map(convertPartitionToRow(p)) > > > > > > > > add p to partitionsAfterPruning where the predicate is true. > > > > > > > > I think the main logic can be classified into two sides, one exists > in > > > the > > > > logical rule, and the other exists in the connector side. The catalog > > > info > > > > should be used on the rule side, and not on the connector side, the > > > pruning > > > > function could be used on both of them or unified on the connector > side. > > > > > > > > > > > > Proposed changes > > > > > > > > > > > > - add a new method in SupportsPartitionPushDown > > > > - let HiveSourceTable, HiveSourceBuilder, and > > > > HiveContinuousPartitionFetcher hold the pruning function. > > > > - pruning after fetchPartitions invoked. > > > > > > > > Considering the version compatibility and the optimization for the > method > > > > of listing partitions with filter in the catalog, I think we can add > a > > > new > > > > method in *SupportsPartitionPushDown* > > > > > > > > /** > > > > * Provides a list of remaining partitions. After those partitions are > > > > applied, a source must > > > > * not read the data of other partitions during runtime. > > > > * > > > > * <p>See the documentation of {@link SupportsPartitionPushDown} for > more > > > > information. > > > > */ > > > > void applyPartitions(List<Map<String, String>> remainingPartitions); > > > > > > > > /** > > > > * Provides a pruning function for uncommitted partitions. > > > > */ > > > > default void applyPartitionPuringFunction(MapFunction<RowData, > Boolean> > > > > partitionPruningFunction) { } > > > > > > > > We can push the generated function into TableSource, such that the > > > > ContinuousPartitionFetcher can get it. > > > > > > > > For Batch reading, the 'remainingPartitions' will be seen as the > > > partitions > > > > needed to consume, for streaming reading, we use the > > > > 'partitionPruningFunction' to ignore the unneeded partitions. > > > > Rejected Alternatives > > > > > > > > Do not remove the filter logic in Filter Node about the partition > keys, > > > if > > > > the source will execute streaming reading. > > > > > > > > > > > > Looking forward to your opinions. > > > > > > > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-27898 > > > > > > > > best > > > > > > > > zoucao > > > > > > > >