Hi zoucao,

The FileSource does support streaming reading [1].

Best regards,

Martijn

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/api/java/org/apache/flink/connector/file/src/FileSource.html

Op ma 4 jul. 2022 om 05:58 schreef godfrey he <godfre...@gmail.com>:

> Hi zoucao,
>
> Look forward your FLIP.
>
> >For Batch reading, the 'remainingPartitions' will be seen as the
> partitions
> >needed to consume, for streaming reading, we use the
> >'partitionPruningFunction' to ignore the unneeded partitions.
> There should be for bounded source(maybe batch or streaming),
> `applyPartitions` should be used,
> while only for unbounded source, `applyPartitionPuringFunction` can be
> used.
>
> Best,
> Godfrey
>
> cao zou <zoucao...@gmail.com> 于2022年7月4日周一 11:04写道:
> >
> > Hi Martijn, thanks for your attention, I'm glad to create a FLIP, and
> could
> > you help give me the permission?
> > My Id is zoucao, and my mail is zoucao...@gmail.com.
> >
> > The implications for FileSource
> >
> > In the above discussion, only HiveSource has been involved, because it
> > holds a continuous partition fetcher, but FileSource not. If we do the
> > streaming pruning only in the partition fetcher, it will not affect the
> > FileSource. If the FileSource supports streaming reading in the future,
> the
> > same changes can be applied to it.
> >
> > Best regards,
> > zoucao
> >
> > Martijn Visser <martijnvis...@apache.org> 于2022年7月1日周五 16:20写道:
> >
> > > Hi zoucao,
> > >
> > > I think this topic deserves a proper FLIP and a vote. This approach is
> > > focussed only on Hive, but I would also like to understand the
> implications
> > > for FileSource. Can you create one?
> > >
> > > Best regards,
> > >
> > > Martijn
> > >
> > > Op wo 22 jun. 2022 om 18:50 schreef cao zou <zoucao...@gmail.com>:
> > >
> > > > Hi devs, I want to start a discussion to find a way to support
> partition
> > > > pruning for streaming reading.
> > > >
> > > >
> > > > Now, Flink has supported the partition pruning, the implementation
> > > consists
> > > > of *Source Ability*, *Logical Rule*, and the interface
> > > > *SupportsPartitionPushDown*, but they all only take effect in batch
> > > > reading. When reading a table in streaming mode, the existing
> mechanism
> > > > will cause some problems posted by FLINK-27898
> > > > <https://issues.apache.org/jira/browse/FLINK-27898>[1], and the
> records
> > > > that should be filtered will be sent downstream.
> > > >
> > > > To solve this drawback, this discussion is proposed, and the Hive and
> > > other
> > > > BigData systems stored with partitions will benefit more from it.
> > > >
> > > >  Now, the existing partitions which are needed to consume will be
> > > generated
> > > > in *PushPartitionIntoTableSourceScanRule*. Then, the partitions will
> be
> > > > pushed into TableSource. It’s working well in batch mode, but if we
> want
> > > to
> > > > read records from Hive in streaming mode, and consider the partitions
> > > > committed in the future, it’s not enough.
> > > >
> > > > To support pruning the partitions committed in the feature, the
> pruning
> > > > function should be pushed into the TableSource, and then delivered to
> > > > *ContinuousPartitionFetcher*, such that the pruning for uncommitted
> > > > partitions can be invoked here.
> > > >
> > > > Before proposing the changes, I think it is necessary to clarify the
> > > > existing pruning logic. The main logic of the pruning in
> > > > *PushPartitionIntoTableSourceScanRule* is as follows.
> > > >
> > > > Firstly, generating a pruning function called partitionPruner, the
> > > function
> > > > is extended from a RichMapFunction<GenericRowData, Boolean>.
> > > >
> > > >
> > > > if tableSource.listPartitions() is not empty:
> > > >   partitions = dynamicTableSource.listPartitions()
> > > >
> > > >   for p in partitions:
> > > >     boolean predicate = partitionPruner.map(convertPartitionToRow(p))
> > > >
> > > >     add p to partitionsAfterPruning where the predicate is true.
> > > >
> > > > else  tableSource.listPartitions() is empty:
> > > >   if the filter can be converted to ResolvedExpression &&
> > > >     the catalog can support the filter :
> > > >
> > > >     partitionsAfterPruning = catalog.listPartitionsByFilter()
> > > >
> > > >     the value of partitionsAfterPruning is all needed.
> > > >   else :
> > > >
> > > >     partitions = catalog.listPartitions()
> > > >     for p in partitions:
> > > >     boolean predicate = partitionPruner.map(convertPartitionToRow(p))
> > > >
> > > >      add p to partitionsAfterPruning where the predicate is true.
> > > >
> > > > I think the main logic can be classified into two sides, one exists
> in
> > > the
> > > > logical rule, and the other exists in the connector side. The catalog
> > > info
> > > > should be used on the rule side, and not on the connector side, the
> > > pruning
> > > > function could be used on both of them or unified on the connector
> side.
> > > >
> > > >
> > > > Proposed changes
> > > >
> > > >
> > > >    - add a new method in SupportsPartitionPushDown
> > > >    - let HiveSourceTable, HiveSourceBuilder, and
> > > >    HiveContinuousPartitionFetcher hold the pruning function.
> > > >    - pruning after fetchPartitions invoked.
> > > >
> > > > Considering the version compatibility and the optimization for the
> method
> > > > of listing partitions with filter in the catalog, I think we can add
> a
> > > new
> > > > method in *SupportsPartitionPushDown*
> > > >
> > > > /**
> > > > * Provides a list of remaining partitions. After those partitions are
> > > > applied, a source must
> > > > * not read the data of other partitions during runtime.
> > > > *
> > > > * <p>See the documentation of {@link SupportsPartitionPushDown} for
> more
> > > > information.
> > > > */
> > > > void applyPartitions(List<Map<String, String>> remainingPartitions);
> > > >
> > > > /**
> > > > * Provides a pruning function for uncommitted partitions.
> > > > */
> > > > default void applyPartitionPuringFunction(MapFunction<RowData,
> Boolean>
> > > > partitionPruningFunction) { }
> > > >
> > > > We can push the generated function into TableSource, such that the
> > > > ContinuousPartitionFetcher can get it.
> > > >
> > > > For Batch reading, the 'remainingPartitions' will be seen as the
> > > partitions
> > > > needed to consume, for streaming reading, we use the
> > > > 'partitionPruningFunction' to ignore the unneeded partitions.
> > > > Rejected Alternatives
> > > >
> > > > Do not remove the filter logic in Filter Node about the partition
> keys,
> > > if
> > > > the source will execute streaming reading.
> > > >
> > > >
> > > > Looking forward to your opinions.
> > > >
> > > >
> > > > [1] https://issues.apache.org/jira/browse/FLINK-27898
> > > >
> > > > best
> > > >
> > > > zoucao
> > > >
> > >
>

Reply via email to