Hi Yuxia,

I made a mistake in the above response.

The runtime filter can work well with all shuffle mode. However, hybrid
shuffle and blocking shuffle are currently recommended for batch jobs
(piepline shuffle is not recommended).

One more thing to mention here is that we will force the edge between
RuntimeFilterBuilder and RuntimeFilter to be BLOCKING(regardless of which
BatchShuffleMode is set). Because the RuntimeFilter really doesn’t need to
run before the RuntimeFilterBuilder finished. Even if the RuntimeFilter
becomes running before the RuntimeFilterBuilder finished, it will not
process any data and will occupy resources.

Best,
Lijie

Lijie Wang <wangdachui9...@gmail.com> 于2023年6月15日周四 09:48写道:

> Hi Yuxia,
>
> Thanks for your feedback. The answers of your questions are as follows:
>
> 1. Yes, the row count comes from statistic of underlying table(Or
> estimated based on the statistic of underlying table, if the build side or
> probe side is not TableScan).  If the statistic unavailable, we will not
> inject a runtime filter(As you said, we can hardly evaluate the benefits).
> Besides, AFAIK, the estimated data size of build side is also based on the
> row count statistics, that is, if the statistics is unavailable, the
> requirement "table.optimizer.runtime-filter.max-build-data-size" cannot be
> evaluated either. I'll add this point into FLIP.
>
> 2.
> Estimated data size does not meet requirement (in planner optimization
> phase) -> No filter
> Estimated data size meets the requirement (in planner optimization phase),
> but the real data size does not meet the requirement(in execution phase) ->
> Fake filter
>
> 3. Yes, the runtime filter is only for batch jobs/blocking shuffle.
>
> Best,
> Lijie
>
> yuxia <luoyu...@alumni.sjtu.edu.cn> 于2023年6月14日周三 20:37写道:
>
>> Thanks Lijie for starting this discussion. Excited to see runtime filter
>> is to be implemented in Flink.
>> I have few questions about it:
>>
>> 1: As the FLIP said, `if the ndv cannot be estimated, use row count
>> instead`. So, does row count comes from the statistic from underlying
>> table? What if the the statistic is also unavailable considering users
>> maynot always remember to generate statistic in production.
>> I'm wondering whether it make senese that just disable runtime filter if
>> statistic is unavailable since in that case, we can hardly evaluate the
>> benefits of runtime-filter.
>>
>>
>> 2: The FLIP said: "We will inject the runtime filters only if the
>> following requirements are met:xxx", but it also said, "Once this limit is
>> exceeded, it will output a fake filter(which always returns true)" in
>> `RuntimeFilterBuilderOperator` part; Seems they are contradictory, so i'm
>> wondering what's the real behavior, no filter will be injected or fake
>> filter?
>>
>>
>> 3: Does it also mean runtime-filter can only take effect in blocking
>> shuffle?
>>
>>
>>
>> Best regards,
>> Yuxia
>>
>> ----- 原始邮件 -----
>> 发件人: "ron9 liu" <ron9....@gmail.com>
>> 收件人: "dev" <dev@flink.apache.org>
>> 发送时间: 星期三, 2023年 6 月 14日 下午 5:29:28
>> 主题: Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs
>>
>> Thanks Lijie start this discussion. Runtime Filter is a common
>> optimization
>> to improve the join performance that has been adopted by many computing
>> engines such as Spark, Doris, etc... Flink is a streaming batch computing
>> engine, and we are continuously optimizing the performance of batches.
>> Runtime filter is a general performance optimization technique that can
>> improve the performance of Flink batch jobs, so we are introducing it on
>> batch as well.
>>
>> Looking forward to all feedback.
>>
>> Best,
>> Ron
>>
>> Lijie Wang <wangdachui9...@gmail.com> 于2023年6月14日周三 17:17写道:
>>
>> > Hi devs
>> >
>> > Ron Liu, Gen Luo and I would like to start a discussion about FLIP-324:
>> > Introduce Runtime Filter for Flink Batch Jobs[1]
>> >
>> > Runtime Filter is a common optimization to improve join performance. It
>> is
>> > designed to dynamically generate filter conditions for certain Join
>> queries
>> > at runtime to reduce the amount of scanned or shuffled data, avoid
>> > unnecessary I/O and network transmission, and speed up the query. Its
>> > working principle is building a filter(e.g. bloom filter) based on the
>> data
>> > on the small table side(build side) first, then pass this filter to the
>> > large table side(probe side) to filter the irrelevant data on it, this
>> can
>> > reduce the data reaching the join and improve performance.
>> >
>> > You can find more details in the FLIP-324[1]. Looking forward to your
>> > feedback.
>> >
>> > [1]
>> >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-324%3A+Introduce+Runtime+Filter+for+Flink+Batch+Jobs
>> >
>> > Best,
>> > Ron & Gen & Lijie
>> >
>>
>

Reply via email to