Hi devs, I want to start a discussion to find a way to support partition
pruning for streaming reading.


Now, Flink has supported the partition pruning, the implementation consists
of *Source Ability*, *Logical Rule*, and the interface
*SupportsPartitionPushDown*, but they all only take effect in batch
reading. When reading a table in streaming mode, the existing mechanism
will cause some problems posted by FLINK-27898
<https://issues.apache.org/jira/browse/FLINK-27898>[1], and the records
that should be filtered will be sent downstream.

To solve this drawback, this discussion is proposed, and the Hive and other
BigData systems stored with partitions will benefit more from it.

 Now, the existing partitions which are needed to consume will be generated
in *PushPartitionIntoTableSourceScanRule*. Then, the partitions will be
pushed into TableSource. It’s working well in batch mode, but if we want to
read records from Hive in streaming mode, and consider the partitions
committed in the future, it’s not enough.

To support pruning the partitions committed in the feature, the pruning
function should be pushed into the TableSource, and then delivered to
*ContinuousPartitionFetcher*, such that the pruning for uncommitted
partitions can be invoked here.

Before proposing the changes, I think it is necessary to clarify the
existing pruning logic. The main logic of the pruning in
*PushPartitionIntoTableSourceScanRule* is as follows.

Firstly, generating a pruning function called partitionPruner, the function
is extended from a RichMapFunction<GenericRowData, Boolean>.


if tableSource.listPartitions() is not empty:
  partitions = dynamicTableSource.listPartitions()

  for p in partitions:
    boolean predicate = partitionPruner.map(convertPartitionToRow(p))

    add p to partitionsAfterPruning where the predicate is true.

else  tableSource.listPartitions() is empty:
  if the filter can be converted to ResolvedExpression &&
    the catalog can support the filter :

    partitionsAfterPruning = catalog.listPartitionsByFilter()

    the value of partitionsAfterPruning is all needed.
  else :

    partitions = catalog.listPartitions()
    for p in partitions:
    boolean predicate = partitionPruner.map(convertPartitionToRow(p))

     add p to partitionsAfterPruning where the predicate is true.

I think the main logic can be classified into two sides, one exists in the
logical rule, and the other exists in the connector side. The catalog info
should be used on the rule side, and not on the connector side, the pruning
function could be used on both of them or unified on the connector side.


Proposed changes


   - add a new method in SupportsPartitionPushDown
   - let HiveSourceTable, HiveSourceBuilder, and
   HiveContinuousPartitionFetcher hold the pruning function.
   - pruning after fetchPartitions invoked.

Considering the version compatibility and the optimization for the method
of listing partitions with filter in the catalog, I think we can add a new
method in *SupportsPartitionPushDown*

/**
* Provides a list of remaining partitions. After those partitions are
applied, a source must
* not read the data of other partitions during runtime.
*
* <p>See the documentation of {@link SupportsPartitionPushDown} for more
information.
*/
void applyPartitions(List<Map<String, String>> remainingPartitions);

/**
* Provides a pruning function for uncommitted partitions.
*/
default void applyPartitionPuringFunction(MapFunction<RowData, Boolean>
partitionPruningFunction) { }

We can push the generated function into TableSource, such that the
ContinuousPartitionFetcher can get it.

For Batch reading, the 'remainingPartitions' will be seen as the partitions
needed to consume, for streaming reading, we use the
'partitionPruningFunction' to ignore the unneeded partitions.
Rejected Alternatives

Do not remove the filter logic in Filter Node about the partition keys, if
the source will execute streaming reading.


Looking forward to your opinions.


[1] https://issues.apache.org/jira/browse/FLINK-27898

best

zoucao

Reply via email to