Hi zoucao,

I think this topic deserves a proper FLIP and a vote. This approach is
focussed only on Hive, but I would also like to understand the implications
for FileSource. Can you create one?

Best regards,

Martijn

Op wo 22 jun. 2022 om 18:50 schreef cao zou <zoucao...@gmail.com>:

> Hi devs, I want to start a discussion to find a way to support partition
> pruning for streaming reading.
>
>
> Now, Flink has supported the partition pruning, the implementation consists
> of *Source Ability*, *Logical Rule*, and the interface
> *SupportsPartitionPushDown*, but they all only take effect in batch
> reading. When reading a table in streaming mode, the existing mechanism
> will cause some problems posted by FLINK-27898
> <https://issues.apache.org/jira/browse/FLINK-27898>[1], and the records
> that should be filtered will be sent downstream.
>
> To solve this drawback, this discussion is proposed, and the Hive and other
> BigData systems stored with partitions will benefit more from it.
>
>  Now, the existing partitions which are needed to consume will be generated
> in *PushPartitionIntoTableSourceScanRule*. Then, the partitions will be
> pushed into TableSource. It’s working well in batch mode, but if we want to
> read records from Hive in streaming mode, and consider the partitions
> committed in the future, it’s not enough.
>
> To support pruning the partitions committed in the feature, the pruning
> function should be pushed into the TableSource, and then delivered to
> *ContinuousPartitionFetcher*, such that the pruning for uncommitted
> partitions can be invoked here.
>
> Before proposing the changes, I think it is necessary to clarify the
> existing pruning logic. The main logic of the pruning in
> *PushPartitionIntoTableSourceScanRule* is as follows.
>
> Firstly, generating a pruning function called partitionPruner, the function
> is extended from a RichMapFunction<GenericRowData, Boolean>.
>
>
> if tableSource.listPartitions() is not empty:
>   partitions = dynamicTableSource.listPartitions()
>
>   for p in partitions:
>     boolean predicate = partitionPruner.map(convertPartitionToRow(p))
>
>     add p to partitionsAfterPruning where the predicate is true.
>
> else  tableSource.listPartitions() is empty:
>   if the filter can be converted to ResolvedExpression &&
>     the catalog can support the filter :
>
>     partitionsAfterPruning = catalog.listPartitionsByFilter()
>
>     the value of partitionsAfterPruning is all needed.
>   else :
>
>     partitions = catalog.listPartitions()
>     for p in partitions:
>     boolean predicate = partitionPruner.map(convertPartitionToRow(p))
>
>      add p to partitionsAfterPruning where the predicate is true.
>
> I think the main logic can be classified into two sides, one exists in the
> logical rule, and the other exists in the connector side. The catalog info
> should be used on the rule side, and not on the connector side, the pruning
> function could be used on both of them or unified on the connector side.
>
>
> Proposed changes
>
>
>    - add a new method in SupportsPartitionPushDown
>    - let HiveSourceTable, HiveSourceBuilder, and
>    HiveContinuousPartitionFetcher hold the pruning function.
>    - pruning after fetchPartitions invoked.
>
> Considering the version compatibility and the optimization for the method
> of listing partitions with filter in the catalog, I think we can add a new
> method in *SupportsPartitionPushDown*
>
> /**
> * Provides a list of remaining partitions. After those partitions are
> applied, a source must
> * not read the data of other partitions during runtime.
> *
> * <p>See the documentation of {@link SupportsPartitionPushDown} for more
> information.
> */
> void applyPartitions(List<Map<String, String>> remainingPartitions);
>
> /**
> * Provides a pruning function for uncommitted partitions.
> */
> default void applyPartitionPuringFunction(MapFunction<RowData, Boolean>
> partitionPruningFunction) { }
>
> We can push the generated function into TableSource, such that the
> ContinuousPartitionFetcher can get it.
>
> For Batch reading, the 'remainingPartitions' will be seen as the partitions
> needed to consume, for streaming reading, we use the
> 'partitionPruningFunction' to ignore the unneeded partitions.
> Rejected Alternatives
>
> Do not remove the filter logic in Filter Node about the partition keys, if
> the source will execute streaming reading.
>
>
> Looking forward to your opinions.
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-27898
>
> best
>
> zoucao
>

Reply via email to