Hi, Godfrey
Thanks for updating the FLIP.
It looks good to me now.

Best,
Jing Zhang

godfrey he <godfre...@gmail.com> 于2022年7月26日周二 12:33写道:

> Thanks for all the inputs, I have updated the document and POC code.
>
>
> Best,
> Godfrey
>
> Yun Gao <yungao...@aliyun.com.invalid> 于2022年7月26日周二 11:11写道:
> >
> > Hi,
> >
> > Thanks all for all the valuable discussion on this FLIP, +1 for
> implementing
> > dynamic partition pruning / dynamic filtering pushdown since it is a key
> optimization
> > to improve the performance on batch processing.
> >
> > Also due to introducing the speculative execution for the batch
> processing, we
> > might also need some consideration for the case with speculative
> execution enabled:
> > 1. The operator coordinator of DynamicFilteringDataCollector should
> ignore the following
> > filtering data in consider of the task might executes for multiple
> attempts.
> > 2. The DynamicFileSplitEnumerator should also implements the
> `SupportsHandleExecutionAttemptSourceEvent`
> > interface, otherwise it would throws exception when received the
> filtering data source event.
> >
> > Best,
> > Yun Gao
> >
> >
> >
> > [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job
> >
> >
> >
> > ------------------------------------------------------------------
> > From:Jing Ge <j...@ververica.com>
> > Send Time:2022 Jul. 21 (Thu.) 18:56
> > To:dev <dev@flink.apache.org>
> > Subject:Re: [DISCUSS] FLIP-248: Introduce dynamic partition pruning
> >
> > Hi,
> >
> > Thanks for the informative discussion! Looking forward to using dynamic
> > filtering provided by Flink.
> >
> > Best regards,
> > Jing
> >
> > On Tue, Jul 19, 2022 at 3:22 AM godfrey he <godfre...@gmail.com> wrote:
> >
> > > Hi, Jingong, Jark, Jing,
> > >
> > > Thanks for for the important inputs.
> > > Lake storage is a very important scenario, and consider more generic
> > > and extended case,
> > > I also would like to use "dynamic filtering" concept instead of
> > > "dynamic partition".
> > >
> > > >maybe the FLIP should also demonstrate the EXPLAIN result, which
> > > is also an API.
> > > I will add a section to describe the EXPLAIN result.
> > >
> > > >Does DPP also support streaming queries?
> > > Yes, but for bounded source.
> > >
> > > >it requires the SplitEnumerator must implements new introduced
> > > `SupportsHandleExecutionAttemptSourceEvent` interface,
> > > +1
> > >
> > > I will update the document and the poc code.
> > >
> > > Best,
> > > Godfrey
> > >
> > > Jing Zhang <beyond1...@gmail.com> 于2022年7月13日周三 20:22写道:
> > > >
> > > > Hi Godfrey,
> > > > Thanks for driving this discussion.
> > > > This is an important improvement for batch sql jobs.
> > > > I agree with Jingsong to expand the capability to more than just
> > > partitions.
> > > > Besides, I have two points:
> > > > 1. Based on FLIP-248[1],
> > > >
> > > > > Dynamic partition pruning mechanism can improve performance by
> avoiding
> > > > > reading large amounts of irrelevant data, and it works for both
> batch
> > > and
> > > > > streaming queries.
> > > >
> > > > Does DPP also support streaming queries?
> > > > It seems the proposed changes in the FLIP-248 does not work for
> streaming
> > > > queries,
> > > > because the dimension table might be an unbounded inputs.
> > > > Or does it require all dimension tables to be bounded inputs for
> > > streaming
> > > > jobs if the job wanna enable DPP?
> > > >
> > > > 2. I notice there are changes on SplitEnumerator for Hive source and
> File
> > > > source.
> > > > And they now depend on SourceEvent to pass PartitionData.
> > > > In FLIP-245, if enable speculative execution for sources based on
> FLIP-27
> > > > which use SourceEvent,
> > > > it requires the SplitEnumerator must implements new introduced
> > > > `SupportsHandleExecutionAttemptSourceEvent` interface,
> > > > otherwise an exception would be thrown out.
> > > > Since hive and File sources are commonly used for batch jobs, it's
> better
> > > > to take this point into consideration.
> > > >
> > > > Best,
> > > > Jing Zhang
> > > >
> > > > [1] FLIP-248:
> > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-248%3A+Introduce+dynamic+partition+pruning
> > > > [2] FLIP-245:
> > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job
> > > >
> > > >
> > > > Jark Wu <imj...@gmail.com> 于2022年7月12日周二 13:16写道:
> > > >
> > > > > I agree with Jingsong. DPP is a particular case of Dynamic Filter
> > > Pushdown
> > > > > that the join key contains partition fields.  Extending this FLIP
> to
> > > > > general filter
> > > > > pushdown can benefit more optimizations, and they can share the
> same
> > > > > interface.
> > > > >
> > > > > For example, Trino Hive Connector leverages dynamic filtering to
> > > support:
> > > > > - dynamic partition pruning for partitioned tables
> > > > > - and dynamic bucket pruning for bucket tables
> > > > > - and dynamic filter pushed into the ORC and Parquet readers to
> perform
> > > > > stripe
> > > > >   or row-group pruning and save on disk I/O.
> > > > >
> > > > > Therefore, +1 to extend this FLIP to Dynamic Filter Pushdown (or
> > > Dynamic
> > > > > Filtering),
> > > > > just like Trino [1].  The interfaces should also be adapted for
> that.
> > > > >
> > > > > Besides, maybe the FLIP should also demonstrate the EXPLAIN result,
> > > which
> > > > > is also an API.
> > > > >
> > > > > Best,
> > > > > Jark
> > > > >
> > > > > [1]: https://trino.io/docs/current/admin/dynamic-filtering.html
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Tue, 12 Jul 2022 at 09:59, Jingsong Li <jingsongl...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Thanks Godfrey for driving.
> > > > > >
> > > > > > I like this FLIP.
> > > > > >
> > > > > > We can restrict this capability to more than just partitions.
> > > > > > Here are some inputs from Lake Storage.
> > > > > >
> > > > > > The format of the splits generated by Lake Storage is roughly as
> > > follows:
> > > > > > Split {
> > > > > >    Path filePath;
> > > > > >    Statistics[] fieldStats;
> > > > > > }
> > > > > >
> > > > > > Stats contain the min and max of each column.
> > > > > >
> > > > > > If the storage is sorted by a column, this means that the split
> > > > > > filtering on that column will be very good, so not only the
> partition
> > > > > > field, but also this column is worthy of being pushed down the
> > > > > > RuntimeFilter.
> > > > > > This information can only be known by source, so I suggest that
> > > source
> > > > > > return which fields are worthy of being pushed down.
> > > > > >
> > > > > > My overall point is:
> > > > > > This FLIP can be extended to support Source Runtime Filter
> push-down
> > > > > > for all fields, not just dynamic partition pruning.
> > > > > >
> > > > > > What do you think?
> > > > > >
> > > > > > Best,
> > > > > > Jingsong
> > > > > >
> > > > > > On Fri, Jul 8, 2022 at 10:12 PM godfrey he <godfre...@gmail.com>
> > > wrote:
> > > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > > I would like to open a discussion on FLIP-248: Introduce
> dynamic
> > > > > > > partition pruning.
> > > > > > >
> > > > > > >  Currently, Flink supports static partition pruning: the
> > > conditions in
> > > > > > > the WHERE clause are analyzed
> > > > > > > to determine in advance which partitions can be safely skipped
> in
> > > the
> > > > > > > optimization phase.
> > > > > > > Another common scenario: the partitions information is not
> > > available
> > > > > > > in the optimization phase but in the execution phase.
> > > > > > > That's the problem this FLIP is trying to solve: dynamic
> partition
> > > > > > > pruning, which could reduce the partition table source IO.
> > > > > > >
> > > > > > > The query pattern looks like:
> > > > > > > select * from store_returns, date_dim where
> sr_returned_date_sk =
> > > > > > > d_date_sk and d_year = 2000
> > > > > > >
> > > > > > > We will introduce a mechanism for detecting dynamic partition
> > > pruning
> > > > > > > patterns in optimization phase
> > > > > > > and performing partition pruning at runtime by sending the
> > > dimension
> > > > > > > table results to the SplitEnumerator
> > > > > > > of fact table via existing coordinator mechanism.
> > > > > > >
> > > > > > > You can find more details in FLIP-248 document[1].
> > > > > > > Looking forward to your any feedback.
> > > > > > >
> > > > > > > [1]
> > > > > >
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-248%3A+Introduce+dynamic+partition+pruning
> > > > > > > [2] POC: https://github.com/godfreyhe/flink/tree/FLIP-248
> > > > > > >
> > > > > > >
> > > > > > > Best,
> > > > > > > Godfrey
> > > > > >
> > > > >SupportsHandleExecutionAttemptSourceEvent>
> >
>

Reply via email to