Hi zoucao,

Look forward your FLIP.

>For Batch reading, the 'remainingPartitions' will be seen as the partitions
>needed to consume, for streaming reading, we use the
>'partitionPruningFunction' to ignore the unneeded partitions.
There should be for bounded source(maybe batch or streaming),
`applyPartitions` should be used,
while only for unbounded source, `applyPartitionPuringFunction` can be used.

Best,
Godfrey

cao zou <zoucao...@gmail.com> 于2022年7月4日周一 11:04写道:
>
> Hi Martijn, thanks for your attention, I'm glad to create a FLIP, and could
> you help give me the permission?
> My Id is zoucao, and my mail is zoucao...@gmail.com.
>
> The implications for FileSource
>
> In the above discussion, only HiveSource has been involved, because it
> holds a continuous partition fetcher, but FileSource not. If we do the
> streaming pruning only in the partition fetcher, it will not affect the
> FileSource. If the FileSource supports streaming reading in the future, the
> same changes can be applied to it.
>
> Best regards,
> zoucao
>
> Martijn Visser <martijnvis...@apache.org> 于2022年7月1日周五 16:20写道:
>
> > Hi zoucao,
> >
> > I think this topic deserves a proper FLIP and a vote. This approach is
> > focussed only on Hive, but I would also like to understand the implications
> > for FileSource. Can you create one?
> >
> > Best regards,
> >
> > Martijn
> >
> > Op wo 22 jun. 2022 om 18:50 schreef cao zou <zoucao...@gmail.com>:
> >
> > > Hi devs, I want to start a discussion to find a way to support partition
> > > pruning for streaming reading.
> > >
> > >
> > > Now, Flink has supported the partition pruning, the implementation
> > consists
> > > of *Source Ability*, *Logical Rule*, and the interface
> > > *SupportsPartitionPushDown*, but they all only take effect in batch
> > > reading. When reading a table in streaming mode, the existing mechanism
> > > will cause some problems posted by FLINK-27898
> > > <https://issues.apache.org/jira/browse/FLINK-27898>[1], and the records
> > > that should be filtered will be sent downstream.
> > >
> > > To solve this drawback, this discussion is proposed, and the Hive and
> > other
> > > BigData systems stored with partitions will benefit more from it.
> > >
> > >  Now, the existing partitions which are needed to consume will be
> > generated
> > > in *PushPartitionIntoTableSourceScanRule*. Then, the partitions will be
> > > pushed into TableSource. It’s working well in batch mode, but if we want
> > to
> > > read records from Hive in streaming mode, and consider the partitions
> > > committed in the future, it’s not enough.
> > >
> > > To support pruning the partitions committed in the feature, the pruning
> > > function should be pushed into the TableSource, and then delivered to
> > > *ContinuousPartitionFetcher*, such that the pruning for uncommitted
> > > partitions can be invoked here.
> > >
> > > Before proposing the changes, I think it is necessary to clarify the
> > > existing pruning logic. The main logic of the pruning in
> > > *PushPartitionIntoTableSourceScanRule* is as follows.
> > >
> > > Firstly, generating a pruning function called partitionPruner, the
> > function
> > > is extended from a RichMapFunction<GenericRowData, Boolean>.
> > >
> > >
> > > if tableSource.listPartitions() is not empty:
> > >   partitions = dynamicTableSource.listPartitions()
> > >
> > >   for p in partitions:
> > >     boolean predicate = partitionPruner.map(convertPartitionToRow(p))
> > >
> > >     add p to partitionsAfterPruning where the predicate is true.
> > >
> > > else  tableSource.listPartitions() is empty:
> > >   if the filter can be converted to ResolvedExpression &&
> > >     the catalog can support the filter :
> > >
> > >     partitionsAfterPruning = catalog.listPartitionsByFilter()
> > >
> > >     the value of partitionsAfterPruning is all needed.
> > >   else :
> > >
> > >     partitions = catalog.listPartitions()
> > >     for p in partitions:
> > >     boolean predicate = partitionPruner.map(convertPartitionToRow(p))
> > >
> > >      add p to partitionsAfterPruning where the predicate is true.
> > >
> > > I think the main logic can be classified into two sides, one exists in
> > the
> > > logical rule, and the other exists in the connector side. The catalog
> > info
> > > should be used on the rule side, and not on the connector side, the
> > pruning
> > > function could be used on both of them or unified on the connector side.
> > >
> > >
> > > Proposed changes
> > >
> > >
> > >    - add a new method in SupportsPartitionPushDown
> > >    - let HiveSourceTable, HiveSourceBuilder, and
> > >    HiveContinuousPartitionFetcher hold the pruning function.
> > >    - pruning after fetchPartitions invoked.
> > >
> > > Considering the version compatibility and the optimization for the method
> > > of listing partitions with filter in the catalog, I think we can add a
> > new
> > > method in *SupportsPartitionPushDown*
> > >
> > > /**
> > > * Provides a list of remaining partitions. After those partitions are
> > > applied, a source must
> > > * not read the data of other partitions during runtime.
> > > *
> > > * <p>See the documentation of {@link SupportsPartitionPushDown} for more
> > > information.
> > > */
> > > void applyPartitions(List<Map<String, String>> remainingPartitions);
> > >
> > > /**
> > > * Provides a pruning function for uncommitted partitions.
> > > */
> > > default void applyPartitionPuringFunction(MapFunction<RowData, Boolean>
> > > partitionPruningFunction) { }
> > >
> > > We can push the generated function into TableSource, such that the
> > > ContinuousPartitionFetcher can get it.
> > >
> > > For Batch reading, the 'remainingPartitions' will be seen as the
> > partitions
> > > needed to consume, for streaming reading, we use the
> > > 'partitionPruningFunction' to ignore the unneeded partitions.
> > > Rejected Alternatives
> > >
> > > Do not remove the filter logic in Filter Node about the partition keys,
> > if
> > > the source will execute streaming reading.
> > >
> > >
> > > Looking forward to your opinions.
> > >
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-27898
> > >
> > > best
> > >
> > > zoucao
> > >
> >

Reply via email to