Hi zoucao,

Regarding the pruning function, maybe a simple filter function is enough,
 e.g. `FilterFunction<CatalogPartitionSpec> partitionFilter`.

Besides, it would be better to state clearly how the new
`applyPartitionPuringFunction`
method affects batch/bounded table sources. From my understanding,
this method won't be called in batch mode?

Best,
Jark

On Mon, 4 Jul 2022 at 19:40, Martijn Visser <martijnvis...@apache.org>
wrote:

> Hi zoucao,
>
> The FileSource does support streaming reading [1].
>
> Best regards,
>
> Martijn
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-release-1.15/api/java/org/apache/flink/connector/file/src/FileSource.html
>
> Op ma 4 jul. 2022 om 05:58 schreef godfrey he <godfre...@gmail.com>:
>
> > Hi zoucao,
> >
> > Look forward your FLIP.
> >
> > >For Batch reading, the 'remainingPartitions' will be seen as the
> > partitions
> > >needed to consume, for streaming reading, we use the
> > >'partitionPruningFunction' to ignore the unneeded partitions.
> > There should be for bounded source(maybe batch or streaming),
> > `applyPartitions` should be used,
> > while only for unbounded source, `applyPartitionPuringFunction` can be
> > used.
> >
> > Best,
> > Godfrey
> >
> > cao zou <zoucao...@gmail.com> 于2022年7月4日周一 11:04写道:
> > >
> > > Hi Martijn, thanks for your attention, I'm glad to create a FLIP, and
> > could
> > > you help give me the permission?
> > > My Id is zoucao, and my mail is zoucao...@gmail.com.
> > >
> > > The implications for FileSource
> > >
> > > In the above discussion, only HiveSource has been involved, because it
> > > holds a continuous partition fetcher, but FileSource not. If we do the
> > > streaming pruning only in the partition fetcher, it will not affect the
> > > FileSource. If the FileSource supports streaming reading in the future,
> > the
> > > same changes can be applied to it.
> > >
> > > Best regards,
> > > zoucao
> > >
> > > Martijn Visser <martijnvis...@apache.org> 于2022年7月1日周五 16:20写道:
> > >
> > > > Hi zoucao,
> > > >
> > > > I think this topic deserves a proper FLIP and a vote. This approach
> is
> > > > focussed only on Hive, but I would also like to understand the
> > implications
> > > > for FileSource. Can you create one?
> > > >
> > > > Best regards,
> > > >
> > > > Martijn
> > > >
> > > > Op wo 22 jun. 2022 om 18:50 schreef cao zou <zoucao...@gmail.com>:
> > > >
> > > > > Hi devs, I want to start a discussion to find a way to support
> > partition
> > > > > pruning for streaming reading.
> > > > >
> > > > >
> > > > > Now, Flink has supported the partition pruning, the implementation
> > > > consists
> > > > > of *Source Ability*, *Logical Rule*, and the interface
> > > > > *SupportsPartitionPushDown*, but they all only take effect in batch
> > > > > reading. When reading a table in streaming mode, the existing
> > mechanism
> > > > > will cause some problems posted by FLINK-27898
> > > > > <https://issues.apache.org/jira/browse/FLINK-27898>[1], and the
> > records
> > > > > that should be filtered will be sent downstream.
> > > > >
> > > > > To solve this drawback, this discussion is proposed, and the Hive
> and
> > > > other
> > > > > BigData systems stored with partitions will benefit more from it.
> > > > >
> > > > >  Now, the existing partitions which are needed to consume will be
> > > > generated
> > > > > in *PushPartitionIntoTableSourceScanRule*. Then, the partitions
> will
> > be
> > > > > pushed into TableSource. It’s working well in batch mode, but if we
> > want
> > > > to
> > > > > read records from Hive in streaming mode, and consider the
> partitions
> > > > > committed in the future, it’s not enough.
> > > > >
> > > > > To support pruning the partitions committed in the feature, the
> > pruning
> > > > > function should be pushed into the TableSource, and then delivered
> to
> > > > > *ContinuousPartitionFetcher*, such that the pruning for uncommitted
> > > > > partitions can be invoked here.
> > > > >
> > > > > Before proposing the changes, I think it is necessary to clarify
> the
> > > > > existing pruning logic. The main logic of the pruning in
> > > > > *PushPartitionIntoTableSourceScanRule* is as follows.
> > > > >
> > > > > Firstly, generating a pruning function called partitionPruner, the
> > > > function
> > > > > is extended from a RichMapFunction<GenericRowData, Boolean>.
> > > > >
> > > > >
> > > > > if tableSource.listPartitions() is not empty:
> > > > >   partitions = dynamicTableSource.listPartitions()
> > > > >
> > > > >   for p in partitions:
> > > > >     boolean predicate =
> partitionPruner.map(convertPartitionToRow(p))
> > > > >
> > > > >     add p to partitionsAfterPruning where the predicate is true.
> > > > >
> > > > > else  tableSource.listPartitions() is empty:
> > > > >   if the filter can be converted to ResolvedExpression &&
> > > > >     the catalog can support the filter :
> > > > >
> > > > >     partitionsAfterPruning = catalog.listPartitionsByFilter()
> > > > >
> > > > >     the value of partitionsAfterPruning is all needed.
> > > > >   else :
> > > > >
> > > > >     partitions = catalog.listPartitions()
> > > > >     for p in partitions:
> > > > >     boolean predicate =
> partitionPruner.map(convertPartitionToRow(p))
> > > > >
> > > > >      add p to partitionsAfterPruning where the predicate is true.
> > > > >
> > > > > I think the main logic can be classified into two sides, one exists
> > in
> > > > the
> > > > > logical rule, and the other exists in the connector side. The
> catalog
> > > > info
> > > > > should be used on the rule side, and not on the connector side, the
> > > > pruning
> > > > > function could be used on both of them or unified on the connector
> > side.
> > > > >
> > > > >
> > > > > Proposed changes
> > > > >
> > > > >
> > > > >    - add a new method in SupportsPartitionPushDown
> > > > >    - let HiveSourceTable, HiveSourceBuilder, and
> > > > >    HiveContinuousPartitionFetcher hold the pruning function.
> > > > >    - pruning after fetchPartitions invoked.
> > > > >
> > > > > Considering the version compatibility and the optimization for the
> > method
> > > > > of listing partitions with filter in the catalog, I think we can
> add
> > a
> > > > new
> > > > > method in *SupportsPartitionPushDown*
> > > > >
> > > > > /**
> > > > > * Provides a list of remaining partitions. After those partitions
> are
> > > > > applied, a source must
> > > > > * not read the data of other partitions during runtime.
> > > > > *
> > > > > * <p>See the documentation of {@link SupportsPartitionPushDown} for
> > more
> > > > > information.
> > > > > */
> > > > > void applyPartitions(List<Map<String, String>>
> remainingPartitions);
> > > > >
> > > > > /**
> > > > > * Provides a pruning function for uncommitted partitions.
> > > > > */
> > > > > default void applyPartitionPuringFunction(MapFunction<RowData,
> > Boolean>
> > > > > partitionPruningFunction) { }
> > > > >
> > > > > We can push the generated function into TableSource, such that the
> > > > > ContinuousPartitionFetcher can get it.
> > > > >
> > > > > For Batch reading, the 'remainingPartitions' will be seen as the
> > > > partitions
> > > > > needed to consume, for streaming reading, we use the
> > > > > 'partitionPruningFunction' to ignore the unneeded partitions.
> > > > > Rejected Alternatives
> > > > >
> > > > > Do not remove the filter logic in Filter Node about the partition
> > keys,
> > > > if
> > > > > the source will execute streaming reading.
> > > > >
> > > > >
> > > > > Looking forward to your opinions.
> > > > >
> > > > >
> > > > > [1] https://issues.apache.org/jira/browse/FLINK-27898
> > > > >
> > > > > best
> > > > >
> > > > > zoucao
> > > > >
> > > >
> >
>

Reply via email to