Thank you Godfrey, the FLIP looks good to me. Best, Jark
On Tue, 26 Jul 2022 at 12:34, godfrey he <godfre...@gmail.com> wrote: > Thanks for all the inputs, I have updated the document and POC code. > > > Best, > Godfrey > > Yun Gao <yungao...@aliyun.com.invalid> 于2022年7月26日周二 11:11写道: > > > > Hi, > > > > Thanks all for all the valuable discussion on this FLIP, +1 for > implementing > > dynamic partition pruning / dynamic filtering pushdown since it is a key > optimization > > to improve the performance on batch processing. > > > > Also due to introducing the speculative execution for the batch > processing, we > > might also need some consideration for the case with speculative > execution enabled: > > 1. The operator coordinator of DynamicFilteringDataCollector should > ignore the following > > filtering data in consider of the task might executes for multiple > attempts. > > 2. The DynamicFileSplitEnumerator should also implements the > `SupportsHandleExecutionAttemptSourceEvent` > > interface, otherwise it would throws exception when received the > filtering data source event. > > > > Best, > > Yun Gao > > > > > > > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job > > > > > > > > ------------------------------------------------------------------ > > From:Jing Ge <j...@ververica.com> > > Send Time:2022 Jul. 21 (Thu.) 18:56 > > To:dev <dev@flink.apache.org> > > Subject:Re: [DISCUSS] FLIP-248: Introduce dynamic partition pruning > > > > Hi, > > > > Thanks for the informative discussion! Looking forward to using dynamic > > filtering provided by Flink. > > > > Best regards, > > Jing > > > > On Tue, Jul 19, 2022 at 3:22 AM godfrey he <godfre...@gmail.com> wrote: > > > > > Hi, Jingong, Jark, Jing, > > > > > > Thanks for for the important inputs. > > > Lake storage is a very important scenario, and consider more generic > > > and extended case, > > > I also would like to use "dynamic filtering" concept instead of > > > "dynamic partition". > > > > > > >maybe the FLIP should also demonstrate the EXPLAIN result, which > > > is also an API. > > > I will add a section to describe the EXPLAIN result. > > > > > > >Does DPP also support streaming queries? > > > Yes, but for bounded source. > > > > > > >it requires the SplitEnumerator must implements new introduced > > > `SupportsHandleExecutionAttemptSourceEvent` interface, > > > +1 > > > > > > I will update the document and the poc code. > > > > > > Best, > > > Godfrey > > > > > > Jing Zhang <beyond1...@gmail.com> 于2022年7月13日周三 20:22写道: > > > > > > > > Hi Godfrey, > > > > Thanks for driving this discussion. > > > > This is an important improvement for batch sql jobs. > > > > I agree with Jingsong to expand the capability to more than just > > > partitions. > > > > Besides, I have two points: > > > > 1. Based on FLIP-248[1], > > > > > > > > > Dynamic partition pruning mechanism can improve performance by > avoiding > > > > > reading large amounts of irrelevant data, and it works for both > batch > > > and > > > > > streaming queries. > > > > > > > > Does DPP also support streaming queries? > > > > It seems the proposed changes in the FLIP-248 does not work for > streaming > > > > queries, > > > > because the dimension table might be an unbounded inputs. > > > > Or does it require all dimension tables to be bounded inputs for > > > streaming > > > > jobs if the job wanna enable DPP? > > > > > > > > 2. I notice there are changes on SplitEnumerator for Hive source and > File > > > > source. > > > > And they now depend on SourceEvent to pass PartitionData. > > > > In FLIP-245, if enable speculative execution for sources based on > FLIP-27 > > > > which use SourceEvent, > > > > it requires the SplitEnumerator must implements new introduced > > > > `SupportsHandleExecutionAttemptSourceEvent` interface, > > > > otherwise an exception would be thrown out. > > > > Since hive and File sources are commonly used for batch jobs, it's > better > > > > to take this point into consideration. > > > > > > > > Best, > > > > Jing Zhang > > > > > > > > [1] FLIP-248: > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-248%3A+Introduce+dynamic+partition+pruning > > > > [2] FLIP-245: > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job > > > > > > > > > > > > Jark Wu <imj...@gmail.com> 于2022年7月12日周二 13:16写道: > > > > > > > > > I agree with Jingsong. DPP is a particular case of Dynamic Filter > > > Pushdown > > > > > that the join key contains partition fields. Extending this FLIP > to > > > > > general filter > > > > > pushdown can benefit more optimizations, and they can share the > same > > > > > interface. > > > > > > > > > > For example, Trino Hive Connector leverages dynamic filtering to > > > support: > > > > > - dynamic partition pruning for partitioned tables > > > > > - and dynamic bucket pruning for bucket tables > > > > > - and dynamic filter pushed into the ORC and Parquet readers to > perform > > > > > stripe > > > > > or row-group pruning and save on disk I/O. > > > > > > > > > > Therefore, +1 to extend this FLIP to Dynamic Filter Pushdown (or > > > Dynamic > > > > > Filtering), > > > > > just like Trino [1]. The interfaces should also be adapted for > that. > > > > > > > > > > Besides, maybe the FLIP should also demonstrate the EXPLAIN result, > > > which > > > > > is also an API. > > > > > > > > > > Best, > > > > > Jark > > > > > > > > > > [1]: https://trino.io/docs/current/admin/dynamic-filtering.html > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, 12 Jul 2022 at 09:59, Jingsong Li <jingsongl...@gmail.com> > > > wrote: > > > > > > > > > > > Thanks Godfrey for driving. > > > > > > > > > > > > I like this FLIP. > > > > > > > > > > > > We can restrict this capability to more than just partitions. > > > > > > Here are some inputs from Lake Storage. > > > > > > > > > > > > The format of the splits generated by Lake Storage is roughly as > > > follows: > > > > > > Split { > > > > > > Path filePath; > > > > > > Statistics[] fieldStats; > > > > > > } > > > > > > > > > > > > Stats contain the min and max of each column. > > > > > > > > > > > > If the storage is sorted by a column, this means that the split > > > > > > filtering on that column will be very good, so not only the > partition > > > > > > field, but also this column is worthy of being pushed down the > > > > > > RuntimeFilter. > > > > > > This information can only be known by source, so I suggest that > > > source > > > > > > return which fields are worthy of being pushed down. > > > > > > > > > > > > My overall point is: > > > > > > This FLIP can be extended to support Source Runtime Filter > push-down > > > > > > for all fields, not just dynamic partition pruning. > > > > > > > > > > > > What do you think? > > > > > > > > > > > > Best, > > > > > > Jingsong > > > > > > > > > > > > On Fri, Jul 8, 2022 at 10:12 PM godfrey he <godfre...@gmail.com> > > > wrote: > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > I would like to open a discussion on FLIP-248: Introduce > dynamic > > > > > > > partition pruning. > > > > > > > > > > > > > > Currently, Flink supports static partition pruning: the > > > conditions in > > > > > > > the WHERE clause are analyzed > > > > > > > to determine in advance which partitions can be safely skipped > in > > > the > > > > > > > optimization phase. > > > > > > > Another common scenario: the partitions information is not > > > available > > > > > > > in the optimization phase but in the execution phase. > > > > > > > That's the problem this FLIP is trying to solve: dynamic > partition > > > > > > > pruning, which could reduce the partition table source IO. > > > > > > > > > > > > > > The query pattern looks like: > > > > > > > select * from store_returns, date_dim where > sr_returned_date_sk = > > > > > > > d_date_sk and d_year = 2000 > > > > > > > > > > > > > > We will introduce a mechanism for detecting dynamic partition > > > pruning > > > > > > > patterns in optimization phase > > > > > > > and performing partition pruning at runtime by sending the > > > dimension > > > > > > > table results to the SplitEnumerator > > > > > > > of fact table via existing coordinator mechanism. > > > > > > > > > > > > > > You can find more details in FLIP-248 document[1]. > > > > > > > Looking forward to your any feedback. > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-248%3A+Introduce+dynamic+partition+pruning > > > > > > > [2] POC: https://github.com/godfreyhe/flink/tree/FLIP-248 > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > Godfrey > > > > > > > > > > >SupportsHandleExecutionAttemptSourceEvent> > > >