Hi, Jing Thanks for your feedback.
> Afaiu, the runtime Filter will only be Injected when the gap between the build data size and prob data size is big enough. Let's make an extreme example. If the small table(build side) has one row and the large table(probe side) contains tens of billions of rows. This will be the ideal use case for the runtime filter and the improvement will be significant. Is this correct? Yes, you are right. > Speaking of the "Conditions of injecting Runtime Filter" in the FLIP, will the value of max-build-data-size and min-prob-data-size depend on the parallelism config? I.e. with the same data-size setting, is it possible to inject or don't inject runtime filters by adjusting the parallelism? First, let me clarify two points. The first is that RuntimeFilter decides whether to inject or not in the optimization phase, but we do not consider operator parallelism in the SQL optimization phase currently, which is set at the ExecNode level. The second is that in batch mode, the default AdaptiveBatchScheduler[1] is now used, which will derive the parallelism of the downstream operator based on the amount of data produced by the upstream operator, that is, the parallelism is determined by runtime adaptation. In the above case, we cannot decide whether to inject BloomFilter in the optimization stage based on parallelism. A more important point is that the purpose of Runtime Filter is to reduce the amount of data for shuffle, and thus the amount of data processed by the downstream join operator. Therefore, I understand that regardless of the parallelism of the probe, the amount of data in the shuffle must be reduced after inserting the Runtime Filter, which is beneficial to the join operator, so whether to insert the RuntimeFilter or not is not dependent on the parallelism. > Does it make sense to reconsider the formula of ratio calculation to help users easily control the filter injection? Only when ndv does not exist will row count be considered. when size uses the default value and ndv cannot be taken, it is true that this condition may always hold, but this does not seem to affect anything, and the user is also likely to change the value of the size. One question, how do you think we should make it easier for users to control the filter injection? [1]: https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/#adaptive-batch-scheduler Best, Ron Jing Ge <j...@ververica.com.invalid> 于2023年6月20日周二 07:11写道: > Hi Lijie, > > Thanks for your proposal. It is a really nice feature. I'd like to ask a > few questions to understand your thoughts. > > Afaiu, the runtime Filter will only be Injected when the gap between the > build data size and prob data size is big enough. Let's make an extreme > example. If the small table(build side) has one row and the large > table(probe side) contains tens of billions of rows. This will be the ideal > use case for the runtime filter and the improvement will be significant. Is > this correct? > > Speaking of the "Conditions of injecting Runtime Filter" in the FLIP, will > the value of max-build-data-size and min-prob-data-size depend on the > parallelism config? I.e. with the same data-size setting, is it possible to > inject or don't inject runtime filters by adjusting the parallelism? > > In the FLIP, there are default values for the new configuration parameters > that will be used to check the injection condition. If ndv cannot be > estimated, row count will be used. Given the max-build-data-size is 10MB > and the min-prob-data-size is 10GB, in the worst case, the min-filter-ratio > will be 0.999, i.e. the probeNdv is 1000 times buildNdv . If we consider > the duplication and the fact that the large table might have more columns > than the small table, the probeNdv should still be 100 or 10 times > buildNdv, which ends up with a min-filter-ratio equals to 0.99 or 0.9. Both > are bigger than the default value 0.5 in the FLIP. If I am not mistaken, > commonly, a min-filter-ratio less than 0.99 will always allow injecting the > runtime filter. Does it make sense to reconsider the formula of ratio > calculation to help users easily control the filter injection? > > Best regards, > Jing > > On Mon, Jun 19, 2023 at 4:42 PM Lijie Wang <wangdachui9...@gmail.com> > wrote: > > > Hi Stefan, > > > > >> bypassing the dataflow > > I believe it's a possible solution, but it may require more coordination > > and extra conditions (such as DFS), I do think it should be excluded from > > the first version. I'll put it in Future+Improvements as a potential > > improvement. > > > > Thanks again for your quick reply :) > > > > Best, > > Lijie > > > > Stefan Richter <srich...@confluent.io.invalid> 于2023年6月19日周一 20:51写道: > > > > > > > > Hi Lijie, > > > > > > I think you understood me correctly. But I would not consider this a > true > > > cyclic dependency in the dataflow because I would not suggest to send > the > > > filter through an edge in the job graph from join to scan. I’d rather > > > bypass the stream graph for exchanging bringing the filter to the scan. > > For > > > example, the join could report the filter after the build phase, e.g. > to > > > the JM or a predefined DFS folder. And when the probe scan is > scheduled, > > > the JM provides the filter information to the scan when it gets > scheduled > > > for execution or the scan looks in DFS if it can find any filter that > it > > > can use as part of initialization. I’m not suggesting to do it exactly > in > > > those ways, but just to show what I mean by "bypassing the dataflow". > > > > > > Anyways, I’m fine with excluding this optimization from the current > FLIP > > > if you believe it would be hard to implement in Flink. > > > > > > Best, > > > Stefan > > > > > > > > > > On 19. Jun 2023, at 14:07, Lijie Wang <wangdachui9...@gmail.com> > > wrote: > > > > > > > > Hi Stefan, > > > > > > > > If I understand correctly(I hope so), the hash join operator needs to > > > send > > > > the bloom filter to probe scan, and probe scan also needs to send the > > > > filtered data to the hash join operator. This means there will be a > > cycle > > > > in the data flow, it will be hard for current Flink to schedule this > > kind > > > > of graph. I admit we can find a way to do this, but that's probably a > > > > bit outside the scope of this FLIP. So let's do these complex > > > > optimizations later, WDYT? > > > > > > > > Best, > > > > Lijie > > > > > > > > Stefan Richter <srich...@confluent.io.invalid <mailto: > > > srich...@confluent.io.invalid>> 于2023年6月19日周一 18:15写道: > > > > > > > >> Hi Lijie, > > > >> > > > >> Exactly, my proposal was to build the bloom filter in the hash > > > operator. I > > > >> don’t know about all the details about the implementation of Flink’s > > > join > > > >> operator, but I’d assume that even if the join is a two input > operator > > > it > > > >> gets scheduled for 2 different pipelines. First the build phase with > > the > > > >> scan from the dimension table and after that’s completed the probe > > phase > > > >> with the scan of the fact table. I’m not proposing the use the bloom > > > filter > > > >> only in the join operator, but rather send the bloom filter to the > > probe > > > >> scan before starting the probe. I assume this would require some > form > > of > > > >> side channel to transport the filter and coordination to tell the > > > sources > > > >> that such a filter is available. I cannot answer how hard those > would > > > be to > > > >> implement, but the idea doesn’t seem impossible to me. > > > >> > > > >> Best, > > > >> Stefan > > > >> > > > >> > > > >>> On 19. Jun 2023, at 11:56, Lijie Wang <wangdachui9...@gmail.com> > > > wrote: > > > >>> > > > >>> Hi Stefan, > > > >>> > > > >>> Now I know what you mean about point 1. But currently it is > > unfeasible > > > >> for > > > >>> Flink, because the building of the hash table is inside the hash > join > > > >>> operator. The hash join operator has two inputs, it will first > > process > > > >> the > > > >>> data of the build-input to build a hash table, and then use the > hash > > > >> table > > > >>> to process the data of the probe-input. If we want to use the built > > > hash > > > >>> table to deduplicate data for bloom filter, we must put the bloom > > > filter > > > >>> inside the hash join operator. However, in this way, the data > > reaching > > > >> the > > > >>> join operator cannot be reduced (the shuffle/network overhead > cannot > > be > > > >>> reduced), which is not what we expected. > > > >>> > > > >>> Regarding the filter type, I agree with you, more types of filters > > can > > > >>> get further > > > >>> optimization, and it is in our future plan (We described it in the > > > >> section > > > >>> Future+Improvements#More+underlying+implementations). > > > >>> > > > >>> Best, > > > >>> Lijie > > > >>> > > > >>> Stefan Richter <srich...@confluent.io.invalid <mailto: > > > srich...@confluent.io.invalid> <mailto: > > > >> srich...@confluent.io.invalid <mailto:srich...@confluent.io.invalid > > >>> > > > 于2023年6月19日周一 15:58写道: > > > >>> > > > >>>> > > > >>>> Hi Lijie, > > > >>>> > > > >>>> thanks for your response, I agree with what you said about points > 2 > > > and > > > >> 3. > > > >>>> Let me explain a bit more about point 1. This would not apply to > all > > > >> types > > > >>>> of joins and my suggestion is also *not* to build a hash table > only > > > for > > > >> the > > > >>>> purpose to build the bloom filter. > > > >>>> I was thinking about the scenario of a hash join, where you would > > > build > > > >>>> the hash table as part of the join algorithm anyways and then use > > the > > > >>>> keyset of that hash table to 1) have better insights on about NDV > > and > > > >> 2) be > > > >>>> able to construct the bloom filter without duplicates and > therefore > > > >> faster. > > > >>>> So the preconditions where I would use this is if you are > building a > > > >> hash > > > >>>> table as part of the join and you know you are not building for a > > key > > > >>>> column (because there would be no duplicates to eliminate). Then > > your > > > >> bloom > > > >>>> filter construction could benefit already from the deduplication > > work > > > >> that > > > >>>> was done for building the hash table. > > > >>>> > > > >>>> I also wanted to point out that besides bloom filter and IN filter > > you > > > >>>> could also think of other types of filter that can become > > interesting > > > >> for > > > >>>> certain distributions and meta data. For example, if you have > > min/max > > > >>>> information about columns and partitions you could have a bit > vector > > > >>>> represent equilibrium-sized ranges of the key space between min > and > > > max > > > >> and > > > >>>> have the bits represent what part of the range is present and push > > > that > > > >>>> information down to the scan. > > > >>>> > > > >>>> Best, > > > >>>> Stefan > > > >>>> > > > >>>> > > > >>>>> On 19. Jun 2023, at 08:26, Lijie Wang <wangdachui9...@gmail.com > > > <mailto:wangdachui9...@gmail.com>> > > > >> wrote: > > > >>>>> > > > >>>>> Hi Stefan, > > > >>>>> > > > >>>>> Thanks for your feedback. Let me briefly summarize the > optimization > > > >>>> points > > > >>>>> you mentioned above (Please correct me if I'm wrong): > > > >>>>> > > > >>>>> 1. Build an extra hash table for deduplication before building > the > > > >> bloom > > > >>>>> filter. > > > >>>>> 2. Use the two-phase approach to build the bloom filter(first > > local, > > > >> then > > > >>>>> OR-combine). > > > >>>>> 3. Use blocked bloom filters to improve the cache efficiency. > > > >>>>> > > > >>>>> For the above 3 points, I have the following questions or > opinions: > > > >>>>> > > > >>>>> For point 1, it seems that building a hash table also requires > > > >> traversing > > > >>>>> all build side data, and the overhead seems to be the same as > > > building > > > >> a > > > >>>>> bloom filter directly? In addition, the hash table will take up > > more > > > >>>> space > > > >>>>> when the amount of data is large, which is why we choose to use > > bloom > > > >>>>> filter instead of hash table. > > > >>>>> > > > >>>>> For point 2, I think it's a good idea to use the two-phase > approach > > > to > > > >>>>> build the bloom filter. But rather than directly broadcasting the > > > local > > > >>>>> bloom filter to the probe side, I prefer to introduce a global > node > > > for > > > >>>> the > > > >>>>> OR-combine(like two-phase-agg[1]), then broadcast the combined > > bloom > > > >>>> filter > > > >>>>> to the probe side. The latter can reduce the amount of data > > > transferred > > > >>>> by > > > >>>>> the network. I will change the FLIP like this. > > > >>>>> > > > >>>>> For point 3, I think it's a nice optimization, but I prefer to > put > > it > > > >> to > > > >>>>> the future improvements. There is already an implementation of > > bloom > > > >>>> filter > > > >>>>> in flink, we can simply reuse it. Introducing a new bloom filter > > > >>>>> implementation introduces some complexity (we need to implement > > it, > > > >> test > > > >>>>> it, etc), and is not the focus of this FLIP. > > > >>>>> > > > >>>>> [1] > > > >>>>> > > > >>>> > > > >> > > > > > > https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/%252523local-global-aggregation%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw2eoXknGWmG4TSiznxtHFWG%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw3V4Sv1o119cpU4xfP0ifkj&source=gmail-imap&ust=1687781326000000&usg=AOvVaw033xxrkenJpx27XzCVKsda > > > >>>>> > > > >>>>> Best, > > > >>>>> Lijie > > > >>>>> > > > >>>>> Stefan Richter <srich...@confluent.io.invalid <mailto: > > > srich...@confluent.io.invalid> <mailto: > > > >> srich...@confluent.io.invalid <mailto:srich...@confluent.io.invalid > >> > > > <mailto: > > > >>>> srich...@confluent.io.invalid <mailto: > srich...@confluent.io.invalid > > > > > > <mailto:srich...@confluent.io.invalid>>> > > > >> 于2023年6月16日周五 16:45写道: > > > >>>>> > > > >>>>>> Hi, > > > >>>>>> > > > >>>>>> Thanks for the proposal of this feature! I have a question about > > the > > > >>>>>> filter build and a some suggestions for potential improvements. > > > >> First, I > > > >>>>>> wonder why you suggest to run the filter builder as separate > > > operator > > > >>>> with > > > >>>>>> parallelism 1. I’d suggest to integrate the filter distributed > > build > > > >>>> with > > > >>>>>> the hash table build phase as follows: > > > >>>>>> > > > >>>>>> 1. Build the hash table completely in each subtask. > > > >>>>>> 2. The keyset of the hash table is giving us a precise NDV count > > for > > > >>>> every > > > >>>>>> subtask. > > > >>>>>> 3. Build a filter from the subtask hash table. For low > cardinality > > > >>>> tables, > > > >>>>>> I’d go with the suggested optimization of IN-filter. > > > >>>>>> 4. Each build subtask transfers the local bloom filter to all > > probe > > > >>>>>> operators. > > > >>>>>> 5. On the probe operator we can either probe against the > > individual > > > >>>>>> filters, or we OR-combine all subtask filters into aggregated > > bloom > > > >>>> filter. > > > >>>>>> > > > >>>>>> I’m suggesting this because building inserting into a (larger) > > bloom > > > >>>>>> filter can be costly, especially once the filter exceeds cache > > sizes > > > >>>> and is > > > >>>>>> therefor better parallelized. First inserting into the hash > table > > > also > > > >>>>>> deduplicates the keys and we avoid inserting records twice into > > the > > > >>>> bloom > > > >>>>>> filter. If we want to improve cache efficiency for the build of > > > larger > > > >>>>>> filters, we could structure them as blocked bloom filters, where > > the > > > >>>> filter > > > >>>>>> is separated into blocks and all bits of one key go only into > one > > > >> block. > > > >>>>>> That allows us to apply software managed buffering to first > group > > > keys > > > >>>> that > > > >>>>>> go into the same partition (ideally fitting into cache) and then > > > bulk > > > >>>> load > > > >>>>>> partitions once we collected enough keys for one round of > loading. > > > >>>>>> > > > >>>>>> Best, > > > >>>>>> Stefan > > > >>>>>> > > > >>>>>> > > > >>>>>> < > > > >>>> > > > >> > > > > > > https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://www.confluent.io/%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw3p0tBjuVsWz3SLYyPQukfL%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw1THgA9fFMrOd7QpGpwiRx6&source=gmail-imap&ust=1687781326000000&usg=AOvVaw1f-3D9-2lZDGsvFBjeFlvn > > > >>>>> > > > >>>>>> Stefan Richter > > > >>>>>> Principal Engineer II > > > >>>>>> > > > >>>>>> Follow us: < > > > >>>>>> > > > >>>> > > > >> > > > > > > https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://www.confluent.io/blog?utm_source%25253Dfooter%252526utm_medium%25253Demail%252526utm_campaign%25253Dch.email-signature_type.community_content.blog%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw2VU_JTYB24Wp4bF2JshdU7%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw37ghBlQPqP0tTXCfNJCqKv&source=gmail-imap&ust=1687781326000000&usg=AOvVaw20v4QTnSyAz_HAHbMyVY7J > > > >>>>> > > > >>>>>> < > > > >>>> > > > >> > > > > > > https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://twitter.com/ConfluentInc%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw2irnDxUAhXR0N8FUk2orze%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw0ItT553mEuA5KaeJWSH36D&source=gmail-imap&ust=1687781326000000&usg=AOvVaw1mNvHaIwjIKU_gqOuDYLDK > > > >>>>> > > > >>>>>> > > > >>>>>> > > > >>>>>> > > > >>>>>>> On 15. Jun 2023, at 13:35, Lijie Wang < > wangdachui9...@gmail.com > > > <mailto:wangdachui9...@gmail.com> > > > >> <mailto:wangdachui9...@gmail.com> > > > >>>> <mailto:wangdachui9...@gmail.com>> wrote: > > > >>>>>>> > > > >>>>>>> Hi, Benchao and Aitozi, > > > >>>>>>> > > > >>>>>>> Thanks for your feedback about this FLIP. > > > >>>>>>> > > > >>>>>>> @Benchao > > > >>>>>>> > > > >>>>>>>>> I think it would be reasonable to also support "pipeline > > shuffle" > > > >> if > > > >>>>>>> possible. > > > >>>>>>> As I said above, runtime filter can work well with all shuffle > > > mode, > > > >>>>>>> including pipeline shuffle. > > > >>>>>>> > > > >>>>>>>>> if the RuntimeFIlterBuilder could be done quickly than > > > >> RuntimeFilter > > > >>>>>>> operator, it can still filter out additional data afterwards. > > > >>>>>>> I think the main purpose of runtime filter is to reduce the > > shuffle > > > >>>> data > > > >>>>>>> and the data arriving at join. Although eagerly running the > large > > > >>>>>>> table side can process datas in advance, most of the data may > be > > > >>>>>>> irrelevant, causing huge shuffle overhead and slowing the join. > > In > > > >>>>>>> addition, if the join is a hash-join, the probe side of the > > > hash-join > > > >>>>>> also > > > >>>>>>> needs to wait for its build side to complete, so the large > table > > > side > > > >>>> is > > > >>>>>>> likely to be back-pressed. > > > >>>>>>> In addition, I don't tend to add too many configuration options > > in > > > >> the > > > >>>>>>> first version, which may make it more difficult to use (users > > need > > > to > > > >>>>>>> understand a lot of internal implementation details). Maybe it > > > could > > > >>>> be a > > > >>>>>>> future improvement (if it's worthwhile)? > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> @Aitozi > > > >>>>>>> > > > >>>>>>>>> IMO, In the current implementation two source table operators > > > will > > > >> be > > > >>>>>>> executed simultaneously. > > > >>>>>>> The example in FLIP uses blocking shuffle(I will add this point > > to > > > >>>> FLIP). > > > >>>>>>> The runtime filter is generally chained with the large table > side > > > to > > > >>>>>> reduce > > > >>>>>>> the shuffle data (as shown in Figure 2 of FLIP). The job > vertices > > > >>>> should > > > >>>>>> be > > > >>>>>>> scheduled in topological order, so the large table side can > only > > be > > > >>>>>>> scheduled after the RuntimeFilterBuilder finishes. > > > >>>>>>> > > > >>>>>>>>> Are there some tests to show the default value of > > > >>>>>>> table.optimizer.runtime-filter.min-probe-data-size 10G is a > good > > > >>>> default > > > >>>>>>> value. > > > >>>>>>> It's not tested yet, but it will be done before merge the code. > > The > > > >>>>>> current > > > >>>>>>> value refers to systems such as spark and hive. Before code > > > merging, > > > >> we > > > >>>>>>> will test on TPC-DS 10 T to find an optimal set of values. If > you > > > >> have > > > >>>>>>> relevant experience on it, welcome to give some suggestions. > > > >>>>>>> > > > >>>>>>>>> What's the representation of the runtime filter node in > > planner ? > > > >>>>>>> As shown in Figure 1 of FLIP, we intend to add two new physical > > > >> nodes, > > > >>>>>>> RuntimeFilterBuilder and RuntimeFilter. > > > >>>>>>> > > > >>>>>>> Best, > > > >>>>>>> Lijie > > > >>>>>>> > > > >>>>>>> Aitozi <gjying1...@gmail.com <mailto:gjying1...@gmail.com> > > > <mailto:gjying1...@gmail.com> <mailto: > > > >> gjying1...@gmail.com <mailto:gjying1...@gmail.com>> <mailto: > > > >>>> gjying1...@gmail.com <mailto:gjying1...@gmail.com> <mailto: > > > gjying1...@gmail.com>>> > > > >>>>>> 于2023年6月15日周四 15:52写道: > > > >>>>>>> > > > >>>>>>>> Hi Lijie, > > > >>>>>>>> > > > >>>>>>>> Nice to see this valuable feature. After reading the FLIP I > have > > > >>>> some > > > >>>>>>>> questions below: > > > >>>>>>>> > > > >>>>>>>>> Schedule the TableSource(dim) first. > > > >>>>>>>> > > > >>>>>>>> How does it know to schedule the TableSource(dim) first ? IMO, > > In > > > >> the > > > >>>>>>>> current implementation two source table operators will be > > executed > > > >>>>>>>> simultaneously. > > > >>>>>>>> > > > >>>>>>>>> If the data volume on the probe side is too small, the > overhead > > > of > > > >>>>>>>> building runtime filter is not worth it. > > > >>>>>>>> > > > >>>>>>>> Are there some tests to show the default value of > > > >>>>>>>> table.optimizer.runtime-filter.min-probe-data-size 10G is a > good > > > >>>> default > > > >>>>>>>> value. The same to > > > >> table.optimizer.runtime-filter.max-build-data-size > > > >>>>>>>> > > > >>>>>>>>> the runtime filter can be pushed down along the probe side, > as > > > >> close > > > >>>> to > > > >>>>>>>> data sources as possible > > > >>>>>>>> > > > >>>>>>>> What's the representation of the runtime filter node in > planner > > ? > > > Is > > > >>>> it > > > >>>>>> a > > > >>>>>>>> Filternode > > > >>>>>>>> > > > >>>>>>>> Best, > > > >>>>>>>> > > > >>>>>>>> Aitozi. > > > >>>>>>>> > > > >>>>>>>> Benchao Li <libenc...@apache.org <mailto:libenc...@apache.org > > > > > <mailto:libenc...@apache.org> > > > >> <mailto:libenc...@apache.org>> > > > >>>> 于2023年6月15日周四 14:30写道: > > > >>>>>>>> > > > >>>>>>>>> Hi Lijie, > > > >>>>>>>>> > > > >>>>>>>>> Regarding the shuffle mode, I think it would be reasonable to > > > also > > > >>>>>>>> support > > > >>>>>>>>> "pipeline shuffle" if possible. > > > >>>>>>>>> > > > >>>>>>>>> "pipeline shuffle" is a essential for OLAP/MPP computing, > > > although > > > >>>> this > > > >>>>>>>> has > > > >>>>>>>>> not been much exposed to users for now, I know a few > companies > > > that > > > >>>>>> uses > > > >>>>>>>>> Flink as a MPP computing engine, and there is an ongoing > > > effort[1] > > > >> to > > > >>>>>>>> make > > > >>>>>>>>> this usage more powerful. > > > >>>>>>>>> > > > >>>>>>>>> Back to your concern that "Even if the RuntimeFilter becomes > > > >> running > > > >>>>>>>> before > > > >>>>>>>>> the RuntimeFilterBuilder finished, it will not process any > data > > > and > > > >>>>>> will > > > >>>>>>>>> occupy resources", whether it benefits us depends on the > scale > > of > > > >>>> data, > > > >>>>>>>> if > > > >>>>>>>>> the RuntimeFIlterBuilder could be done quickly than > > RuntimeFilter > > > >>>>>>>> operator, > > > >>>>>>>>> it can still filter out additional data afterwards. Hence in > my > > > >>>>>> opinion, > > > >>>>>>>> we > > > >>>>>>>>> do not need to make the edge between RuntimeFilterBuilder and > > > >>>>>>>> RuntimeFilter > > > >>>>>>>>> BLOCKING only, at least it can be configured. > > > >>>>>>>>> > > > >>>>>>>>> [1] > > > >>>>>> > > > >>>> > > > >> > > > > > > https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://www.google.com/url?q%25253Dhttps://issues.apache.org/jira/browse/FLINK-25318%252526source%25253Dgmail-imap%252526ust%25253D1687433776000000%252526usg%25253DAOvVaw3GqdpuiCqegqRLDv1PjMiL%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw1oNzOlNn0UCDtz1M9jAw1x%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw3Zt14Wvxs_b8ghD0dIgPfH&source=gmail-imap&ust=1687781326000000&usg=AOvVaw0HsmkkqPeZGZOBvFiA8NOA > > > >>>>>>>>> > > > >>>>>>>>> Lijie Wang <wangdachui9...@gmail.com <mailto: > > > wangdachui9...@gmail.com> <mailto: > > > >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>> > <mailto: > > > >>>> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com> > <mailto: > > > wangdachui9...@gmail.com>> <mailto: > > > >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>>> > > > >>>>>> 于2023年6月15日周四 14:18写道: > > > >>>>>>>>> > > > >>>>>>>>>> Hi Yuxia, > > > >>>>>>>>>> > > > >>>>>>>>>> I made a mistake in the above response. > > > >>>>>>>>>> > > > >>>>>>>>>> The runtime filter can work well with all shuffle mode. > > However, > > > >>>>>> hybrid > > > >>>>>>>>>> shuffle and blocking shuffle are currently recommended for > > batch > > > >>>> jobs > > > >>>>>>>>>> (piepline shuffle is not recommended). > > > >>>>>>>>>> > > > >>>>>>>>>> One more thing to mention here is that we will force the > edge > > > >>>> between > > > >>>>>>>>>> RuntimeFilterBuilder and RuntimeFilter to be > > BLOCKING(regardless > > > >> of > > > >>>>>>>> which > > > >>>>>>>>>> BatchShuffleMode is set). Because the RuntimeFilter really > > > doesn’t > > > >>>>>> need > > > >>>>>>>>> to > > > >>>>>>>>>> run before the RuntimeFilterBuilder finished. Even if the > > > >>>>>> RuntimeFilter > > > >>>>>>>>>> becomes running before the RuntimeFilterBuilder finished, it > > > will > > > >>>> not > > > >>>>>>>>>> process any data and will occupy resources. > > > >>>>>>>>>> > > > >>>>>>>>>> Best, > > > >>>>>>>>>> Lijie > > > >>>>>>>>>> > > > >>>>>>>>>> Lijie Wang <wangdachui9...@gmail.com <mailto: > > > wangdachui9...@gmail.com> <mailto: > > > >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>> > <mailto: > > > >>>> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com> > <mailto: > > > wangdachui9...@gmail.com>> <mailto: > > > >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>>> > > > >>>>>> 于2023年6月15日周四 09:48写道: > > > >>>>>>>>>> > > > >>>>>>>>>>> Hi Yuxia, > > > >>>>>>>>>>> > > > >>>>>>>>>>> Thanks for your feedback. The answers of your questions are > > as > > > >>>>>>>> follows: > > > >>>>>>>>>>> > > > >>>>>>>>>>> 1. Yes, the row count comes from statistic of underlying > > > table(Or > > > >>>>>>>>>>> estimated based on the statistic of underlying table, if > the > > > >> build > > > >>>>>>>> side > > > >>>>>>>>>> or > > > >>>>>>>>>>> probe side is not TableScan). If the statistic > unavailable, > > we > > > >>>> will > > > >>>>>>>>> not > > > >>>>>>>>>>> inject a runtime filter(As you said, we can hardly evaluate > > the > > > >>>>>>>>>> benefits). > > > >>>>>>>>>>> Besides, AFAIK, the estimated data size of build side is > also > > > >> based > > > >>>>>>>> on > > > >>>>>>>>>> the > > > >>>>>>>>>>> row count statistics, that is, if the statistics is > > > unavailable, > > > >>>> the > > > >>>>>>>>>>> requirement > > > "table.optimizer.runtime-filter.max-build-data-size" > > > >>>>>>>> cannot > > > >>>>>>>>>> be > > > >>>>>>>>>>> evaluated either. I'll add this point into FLIP. > > > >>>>>>>>>>> > > > >>>>>>>>>>> 2. > > > >>>>>>>>>>> Estimated data size does not meet requirement (in planner > > > >>>>>>>> optimization > > > >>>>>>>>>>> phase) -> No filter > > > >>>>>>>>>>> Estimated data size meets the requirement (in planner > > > >> optimization > > > >>>>>>>>>> phase), > > > >>>>>>>>>>> but the real data size does not meet the requirement(in > > > execution > > > >>>>>>>>> phase) > > > >>>>>>>>>> -> > > > >>>>>>>>>>> Fake filter > > > >>>>>>>>>>> > > > >>>>>>>>>>> 3. Yes, the runtime filter is only for batch jobs/blocking > > > >> shuffle. > > > >>>>>>>>>>> > > > >>>>>>>>>>> Best, > > > >>>>>>>>>>> Lijie > > > >>>>>>>>>>> > > > >>>>>>>>>>> yuxia <luoyu...@alumni.sjtu.edu.cn <mailto: > > > luoyu...@alumni.sjtu.edu.cn> <mailto: > > > >> luoyu...@alumni.sjtu.edu.cn <mailto:luoyu...@alumni.sjtu.edu.cn>> > > > <mailto: > > > >>>> luoyu...@alumni.sjtu.edu.cn <mailto:luoyu...@alumni.sjtu.edu.cn> > > > <mailto:luoyu...@alumni.sjtu.edu.cn>> > > > >> <mailto: > > > >>>>>> luoyu...@alumni.sjtu.edu.cn <mailto:luoyu...@alumni.sjtu.edu.cn > > > > > <mailto:luoyu...@alumni.sjtu.edu.cn> > > > >> <mailto:luoyu...@alumni.sjtu.edu.cn>>> > > > >>>> 于2023年6月14日周三 20:37写道: > > > >>>>>>>>>>> > > > >>>>>>>>>>>> Thanks Lijie for starting this discussion. Excited to see > > > >> runtime > > > >>>>>>>>> filter > > > >>>>>>>>>>>> is to be implemented in Flink. > > > >>>>>>>>>>>> I have few questions about it: > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> 1: As the FLIP said, `if the ndv cannot be estimated, use > > row > > > >>>> count > > > >>>>>>>>>>>> instead`. So, does row count comes from the statistic from > > > >>>>>>>> underlying > > > >>>>>>>>>>>> table? What if the the statistic is also unavailable > > > considering > > > >>>>>>>> users > > > >>>>>>>>>>>> maynot always remember to generate statistic in > production. > > > >>>>>>>>>>>> I'm wondering whether it make senese that just disable > > runtime > > > >>>>>>>> filter > > > >>>>>>>>> if > > > >>>>>>>>>>>> statistic is unavailable since in that case, we can hardly > > > >>>> evaluate > > > >>>>>>>>> the > > > >>>>>>>>>>>> benefits of runtime-filter. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> 2: The FLIP said: "We will inject the runtime filters only > > if > > > >> the > > > >>>>>>>>>>>> following requirements are met:xxx", but it also said, > "Once > > > >> this > > > >>>>>>>>> limit > > > >>>>>>>>>> is > > > >>>>>>>>>>>> exceeded, it will output a fake filter(which always > returns > > > >> true)" > > > >>>>>>>> in > > > >>>>>>>>>>>> `RuntimeFilterBuilderOperator` part; Seems they are > > > >> contradictory, > > > >>>>>>>> so > > > >>>>>>>>>> i'm > > > >>>>>>>>>>>> wondering what's the real behavior, no filter will be > > injected > > > >> or > > > >>>>>>>> fake > > > >>>>>>>>>>>> filter? > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> 3: Does it also mean runtime-filter can only take effect > in > > > >>>> blocking > > > >>>>>>>>>>>> shuffle? > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Best regards, > > > >>>>>>>>>>>> Yuxia > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> ----- 原始邮件 ----- > > > >>>>>>>>>>>> 发件人: "ron9 liu" <ron9....@gmail.com <mailto: > > > ron9....@gmail.com> <mailto:ron9....@gmail.com> > > > >> <mailto:ron9....@gmail.com> > > > >>>> <mailto:ron9....@gmail.com>> > > > >>>>>>>>>>>> 收件人: "dev" <dev@flink.apache.org <mailto: > > dev@flink.apache.org> > > > <mailto:dev@flink.apache.org> > > > >> <mailto:dev@flink.apache.org> > > > >>>> <mailto:dev@flink.apache.org>> > > > >>>>>>>>>>>> 发送时间: 星期三, 2023年 6 月 14日 下午 5:29:28 > > > >>>>>>>>>>>> 主题: Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for > > Flink > > > >>>> Batch > > > >>>>>>>>>> Jobs > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Thanks Lijie start this discussion. Runtime Filter is a > > common > > > >>>>>>>>>>>> optimization > > > >>>>>>>>>>>> to improve the join performance that has been adopted by > > many > > > >>>>>>>>> computing > > > >>>>>>>>>>>> engines such as Spark, Doris, etc... Flink is a streaming > > > batch > > > >>>>>>>>>> computing > > > >>>>>>>>>>>> engine, and we are continuously optimizing the performance > > of > > > >>>>>>>> batches. > > > >>>>>>>>>>>> Runtime filter is a general performance optimization > > technique > > > >>>> that > > > >>>>>>>>> can > > > >>>>>>>>>>>> improve the performance of Flink batch jobs, so we are > > > >> introducing > > > >>>>>>>> it > > > >>>>>>>>> on > > > >>>>>>>>>>>> batch as well. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Looking forward to all feedback. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Best, > > > >>>>>>>>>>>> Ron > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Lijie Wang <wangdachui9...@gmail.com <mailto: > > > wangdachui9...@gmail.com> <mailto: > > > >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>> > <mailto: > > > >>>> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com> > <mailto: > > > wangdachui9...@gmail.com>> <mailto: > > > >>>>>> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com> > > <mailto: > > > wangdachui9...@gmail.com> <mailto: > > > >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>>>> > > > >>>> 于2023年6月14日周三 17:17写道: > > > >>>>>>>>>>>> > > > >>>>>>>>>>>>> Hi devs > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Ron Liu, Gen Luo and I would like to start a discussion > > about > > > >>>>>>>>>> FLIP-324: > > > >>>>>>>>>>>>> Introduce Runtime Filter for Flink Batch Jobs[1] > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Runtime Filter is a common optimization to improve join > > > >>>>>>>> performance. > > > >>>>>>>>>> It > > > >>>>>>>>>>>> is > > > >>>>>>>>>>>>> designed to dynamically generate filter conditions for > > > certain > > > >>>>>>>> Join > > > >>>>>>>>>>>> queries > > > >>>>>>>>>>>>> at runtime to reduce the amount of scanned or shuffled > > data, > > > >>>> avoid > > > >>>>>>>>>>>>> unnecessary I/O and network transmission, and speed up > the > > > >> query. > > > >>>>>>>>> Its > > > >>>>>>>>>>>>> working principle is building a filter(e.g. bloom filter) > > > based > > > >>>> on > > > >>>>>>>>> the > > > >>>>>>>>>>>> data > > > >>>>>>>>>>>>> on the small table side(build side) first, then pass this > > > >> filter > > > >>>>>>>> to > > > >>>>>>>>>> the > > > >>>>>>>>>>>>> large table side(probe side) to filter the irrelevant > data > > on > > > >> it, > > > >>>>>>>>> this > > > >>>>>>>>>>>> can > > > >>>>>>>>>>>>> reduce the data reaching the join and improve > performance. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> You can find more details in the FLIP-324[1]. Looking > > forward > > > >> to > > > >>>>>>>>> your > > > >>>>>>>>>>>>> feedback. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> [1] > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>> > > > >>>>>> > > > >>>> > > > >> > > > > > > https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://www.google.com/url?q%25253Dhttps://cwiki.apache.org/confluence/display/FLINK/FLIP-324%252525253A%2525252BIntroduce%2525252BRuntime%2525252BFilter%2525252Bfor%2525252BFlink%2525252BBatch%2525252BJobs%252526source%25253Dgmail-imap%252526ust%25253D1687433776000000%252526usg%25253DAOvVaw0ke1ZHcJ--A1QgsbB84MHA%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw21E3CQyayeBTYztmOnwMcz%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw0xVu0zYYNRmh8u8aq7uSi3&source=gmail-imap&ust=1687781326000000&usg=AOvVaw1LXwtWT177350iKD3sKCEt > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Best, > > > >>>>>>>>>>>>> Ron & Gen & Lijie > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> -- > > > >>>>>>>>> > > > >>>>>>>>> Best, > > > >>>>>>>>> Benchao Li > > > > > > > > >