Hi all, I've updated the FLIP according to the above discussion, the changes mainly include: Declare the supported shuffle types. Build the runtime filters using a two-phase approach. Add more potential future improvements.
Best, Lijie liu ron <ron9....@gmail.com> 于2023年6月20日周二 11:19写道: > Hi, Jing > > Thanks for your feedback. > > > Afaiu, the runtime Filter will only be Injected when the gap between the > build data size and prob data size is big enough. Let's make an extreme > example. If the small table(build side) has one row and the large > table(probe side) contains tens of billions of rows. This will be the ideal > use case for the runtime filter and the improvement will be significant. Is > this correct? > > Yes, you are right. > > > Speaking of the "Conditions of injecting Runtime Filter" in the FLIP, > will > the value of max-build-data-size and min-prob-data-size depend on the > parallelism config? I.e. with the same data-size setting, is it possible to > inject or don't inject runtime filters by adjusting the parallelism? > > First, let me clarify two points. The first is that RuntimeFilter decides > whether to inject or not in the optimization phase, but we do not consider > operator parallelism in the SQL optimization phase currently, which is set > at the ExecNode level. The second is that in batch mode, the default > AdaptiveBatchScheduler[1] is now used, which will derive the parallelism of > the downstream operator based on the amount of data produced by the > upstream operator, that is, the parallelism is determined by runtime > adaptation. In the above case, we cannot decide whether to inject > BloomFilter in the optimization stage based on parallelism. > A more important point is that the purpose of Runtime Filter is to reduce > the amount of data for shuffle, and thus the amount of data processed by > the downstream join operator. Therefore, I understand that regardless of > the parallelism of the probe, the amount of data in the shuffle must be > reduced after inserting the Runtime Filter, which is beneficial to the join > operator, so whether to insert the RuntimeFilter or not is not dependent on > the parallelism. > > > Does it make sense to reconsider the formula of ratio > calculation to help users easily control the filter injection? > > Only when ndv does not exist will row count be considered. when size uses > the default value and ndv cannot be taken, it is true that this condition > may always hold, but this does not seem to affect anything, and the user is > also likely to change the value of the size. One question, how do you think > we should make it easier for users to control the filter injection? > > > [1]: > > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/#adaptive-batch-scheduler > > Best, > Ron > > Jing Ge <j...@ververica.com.invalid> 于2023年6月20日周二 07:11写道: > > > Hi Lijie, > > > > Thanks for your proposal. It is a really nice feature. I'd like to ask a > > few questions to understand your thoughts. > > > > Afaiu, the runtime Filter will only be Injected when the gap between the > > build data size and prob data size is big enough. Let's make an extreme > > example. If the small table(build side) has one row and the large > > table(probe side) contains tens of billions of rows. This will be the > ideal > > use case for the runtime filter and the improvement will be significant. > Is > > this correct? > > > > Speaking of the "Conditions of injecting Runtime Filter" in the FLIP, > will > > the value of max-build-data-size and min-prob-data-size depend on the > > parallelism config? I.e. with the same data-size setting, is it possible > to > > inject or don't inject runtime filters by adjusting the parallelism? > > > > In the FLIP, there are default values for the new configuration > parameters > > that will be used to check the injection condition. If ndv cannot be > > estimated, row count will be used. Given the max-build-data-size is 10MB > > and the min-prob-data-size is 10GB, in the worst case, the > min-filter-ratio > > will be 0.999, i.e. the probeNdv is 1000 times buildNdv . If we consider > > the duplication and the fact that the large table might have more columns > > than the small table, the probeNdv should still be 100 or 10 times > > buildNdv, which ends up with a min-filter-ratio equals to 0.99 or 0.9. > Both > > are bigger than the default value 0.5 in the FLIP. If I am not mistaken, > > commonly, a min-filter-ratio less than 0.99 will always allow injecting > the > > runtime filter. Does it make sense to reconsider the formula of ratio > > calculation to help users easily control the filter injection? > > > > Best regards, > > Jing > > > > On Mon, Jun 19, 2023 at 4:42 PM Lijie Wang <wangdachui9...@gmail.com> > > wrote: > > > > > Hi Stefan, > > > > > > >> bypassing the dataflow > > > I believe it's a possible solution, but it may require more > coordination > > > and extra conditions (such as DFS), I do think it should be excluded > from > > > the first version. I'll put it in Future+Improvements as a potential > > > improvement. > > > > > > Thanks again for your quick reply :) > > > > > > Best, > > > Lijie > > > > > > Stefan Richter <srich...@confluent.io.invalid> 于2023年6月19日周一 20:51写道: > > > > > > > > > > > Hi Lijie, > > > > > > > > I think you understood me correctly. But I would not consider this a > > true > > > > cyclic dependency in the dataflow because I would not suggest to send > > the > > > > filter through an edge in the job graph from join to scan. I’d rather > > > > bypass the stream graph for exchanging bringing the filter to the > scan. > > > For > > > > example, the join could report the filter after the build phase, e.g. > > to > > > > the JM or a predefined DFS folder. And when the probe scan is > > scheduled, > > > > the JM provides the filter information to the scan when it gets > > scheduled > > > > for execution or the scan looks in DFS if it can find any filter that > > it > > > > can use as part of initialization. I’m not suggesting to do it > exactly > > in > > > > those ways, but just to show what I mean by "bypassing the dataflow". > > > > > > > > Anyways, I’m fine with excluding this optimization from the current > > FLIP > > > > if you believe it would be hard to implement in Flink. > > > > > > > > Best, > > > > Stefan > > > > > > > > > > > > > On 19. Jun 2023, at 14:07, Lijie Wang <wangdachui9...@gmail.com> > > > wrote: > > > > > > > > > > Hi Stefan, > > > > > > > > > > If I understand correctly(I hope so), the hash join operator needs > to > > > > send > > > > > the bloom filter to probe scan, and probe scan also needs to send > the > > > > > filtered data to the hash join operator. This means there will be a > > > cycle > > > > > in the data flow, it will be hard for current Flink to schedule > this > > > kind > > > > > of graph. I admit we can find a way to do this, but that's > probably a > > > > > bit outside the scope of this FLIP. So let's do these complex > > > > > optimizations later, WDYT? > > > > > > > > > > Best, > > > > > Lijie > > > > > > > > > > Stefan Richter <srich...@confluent.io.invalid <mailto: > > > > srich...@confluent.io.invalid>> 于2023年6月19日周一 18:15写道: > > > > > > > > > >> Hi Lijie, > > > > >> > > > > >> Exactly, my proposal was to build the bloom filter in the hash > > > > operator. I > > > > >> don’t know about all the details about the implementation of > Flink’s > > > > join > > > > >> operator, but I’d assume that even if the join is a two input > > operator > > > > it > > > > >> gets scheduled for 2 different pipelines. First the build phase > with > > > the > > > > >> scan from the dimension table and after that’s completed the probe > > > phase > > > > >> with the scan of the fact table. I’m not proposing the use the > bloom > > > > filter > > > > >> only in the join operator, but rather send the bloom filter to the > > > probe > > > > >> scan before starting the probe. I assume this would require some > > form > > > of > > > > >> side channel to transport the filter and coordination to tell the > > > > sources > > > > >> that such a filter is available. I cannot answer how hard those > > would > > > > be to > > > > >> implement, but the idea doesn’t seem impossible to me. > > > > >> > > > > >> Best, > > > > >> Stefan > > > > >> > > > > >> > > > > >>> On 19. Jun 2023, at 11:56, Lijie Wang <wangdachui9...@gmail.com> > > > > wrote: > > > > >>> > > > > >>> Hi Stefan, > > > > >>> > > > > >>> Now I know what you mean about point 1. But currently it is > > > unfeasible > > > > >> for > > > > >>> Flink, because the building of the hash table is inside the hash > > join > > > > >>> operator. The hash join operator has two inputs, it will first > > > process > > > > >> the > > > > >>> data of the build-input to build a hash table, and then use the > > hash > > > > >> table > > > > >>> to process the data of the probe-input. If we want to use the > built > > > > hash > > > > >>> table to deduplicate data for bloom filter, we must put the bloom > > > > filter > > > > >>> inside the hash join operator. However, in this way, the data > > > reaching > > > > >> the > > > > >>> join operator cannot be reduced (the shuffle/network overhead > > cannot > > > be > > > > >>> reduced), which is not what we expected. > > > > >>> > > > > >>> Regarding the filter type, I agree with you, more types of > filters > > > can > > > > >>> get further > > > > >>> optimization, and it is in our future plan (We described it in > the > > > > >> section > > > > >>> Future+Improvements#More+underlying+implementations). > > > > >>> > > > > >>> Best, > > > > >>> Lijie > > > > >>> > > > > >>> Stefan Richter <srich...@confluent.io.invalid <mailto: > > > > srich...@confluent.io.invalid> <mailto: > > > > >> srich...@confluent.io.invalid <mailto: > srich...@confluent.io.invalid > > > >>> > > > > 于2023年6月19日周一 15:58写道: > > > > >>> > > > > >>>> > > > > >>>> Hi Lijie, > > > > >>>> > > > > >>>> thanks for your response, I agree with what you said about > points > > 2 > > > > and > > > > >> 3. > > > > >>>> Let me explain a bit more about point 1. This would not apply to > > all > > > > >> types > > > > >>>> of joins and my suggestion is also *not* to build a hash table > > only > > > > for > > > > >> the > > > > >>>> purpose to build the bloom filter. > > > > >>>> I was thinking about the scenario of a hash join, where you > would > > > > build > > > > >>>> the hash table as part of the join algorithm anyways and then > use > > > the > > > > >>>> keyset of that hash table to 1) have better insights on about > NDV > > > and > > > > >> 2) be > > > > >>>> able to construct the bloom filter without duplicates and > > therefore > > > > >> faster. > > > > >>>> So the preconditions where I would use this is if you are > > building a > > > > >> hash > > > > >>>> table as part of the join and you know you are not building for > a > > > key > > > > >>>> column (because there would be no duplicates to eliminate). Then > > > your > > > > >> bloom > > > > >>>> filter construction could benefit already from the deduplication > > > work > > > > >> that > > > > >>>> was done for building the hash table. > > > > >>>> > > > > >>>> I also wanted to point out that besides bloom filter and IN > filter > > > you > > > > >>>> could also think of other types of filter that can become > > > interesting > > > > >> for > > > > >>>> certain distributions and meta data. For example, if you have > > > min/max > > > > >>>> information about columns and partitions you could have a bit > > vector > > > > >>>> represent equilibrium-sized ranges of the key space between min > > and > > > > max > > > > >> and > > > > >>>> have the bits represent what part of the range is present and > push > > > > that > > > > >>>> information down to the scan. > > > > >>>> > > > > >>>> Best, > > > > >>>> Stefan > > > > >>>> > > > > >>>> > > > > >>>>> On 19. Jun 2023, at 08:26, Lijie Wang < > wangdachui9...@gmail.com > > > > <mailto:wangdachui9...@gmail.com>> > > > > >> wrote: > > > > >>>>> > > > > >>>>> Hi Stefan, > > > > >>>>> > > > > >>>>> Thanks for your feedback. Let me briefly summarize the > > optimization > > > > >>>> points > > > > >>>>> you mentioned above (Please correct me if I'm wrong): > > > > >>>>> > > > > >>>>> 1. Build an extra hash table for deduplication before building > > the > > > > >> bloom > > > > >>>>> filter. > > > > >>>>> 2. Use the two-phase approach to build the bloom filter(first > > > local, > > > > >> then > > > > >>>>> OR-combine). > > > > >>>>> 3. Use blocked bloom filters to improve the cache efficiency. > > > > >>>>> > > > > >>>>> For the above 3 points, I have the following questions or > > opinions: > > > > >>>>> > > > > >>>>> For point 1, it seems that building a hash table also requires > > > > >> traversing > > > > >>>>> all build side data, and the overhead seems to be the same as > > > > building > > > > >> a > > > > >>>>> bloom filter directly? In addition, the hash table will take up > > > more > > > > >>>> space > > > > >>>>> when the amount of data is large, which is why we choose to use > > > bloom > > > > >>>>> filter instead of hash table. > > > > >>>>> > > > > >>>>> For point 2, I think it's a good idea to use the two-phase > > approach > > > > to > > > > >>>>> build the bloom filter. But rather than directly broadcasting > the > > > > local > > > > >>>>> bloom filter to the probe side, I prefer to introduce a global > > node > > > > for > > > > >>>> the > > > > >>>>> OR-combine(like two-phase-agg[1]), then broadcast the combined > > > bloom > > > > >>>> filter > > > > >>>>> to the probe side. The latter can reduce the amount of data > > > > transferred > > > > >>>> by > > > > >>>>> the network. I will change the FLIP like this. > > > > >>>>> > > > > >>>>> For point 3, I think it's a nice optimization, but I prefer to > > put > > > it > > > > >> to > > > > >>>>> the future improvements. There is already an implementation of > > > bloom > > > > >>>> filter > > > > >>>>> in flink, we can simply reuse it. Introducing a new bloom > filter > > > > >>>>> implementation introduces some complexity (we need to > implement > > > it, > > > > >> test > > > > >>>>> it, etc), and is not the focus of this FLIP. > > > > >>>>> > > > > >>>>> [1] > > > > >>>>> > > > > >>>> > > > > >> > > > > > > > > > > https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/%252523local-global-aggregation%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw2eoXknGWmG4TSiznxtHFWG%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw3V4Sv1o119cpU4xfP0ifkj&source=gmail-imap&ust=1687781326000000&usg=AOvVaw033xxrkenJpx27XzCVKsda > > > > >>>>> > > > > >>>>> Best, > > > > >>>>> Lijie > > > > >>>>> > > > > >>>>> Stefan Richter <srich...@confluent.io.invalid <mailto: > > > > srich...@confluent.io.invalid> <mailto: > > > > >> srich...@confluent.io.invalid <mailto: > srich...@confluent.io.invalid > > >> > > > > <mailto: > > > > >>>> srich...@confluent.io.invalid <mailto: > > srich...@confluent.io.invalid > > > > > > > > <mailto:srich...@confluent.io.invalid>>> > > > > >> 于2023年6月16日周五 16:45写道: > > > > >>>>> > > > > >>>>>> Hi, > > > > >>>>>> > > > > >>>>>> Thanks for the proposal of this feature! I have a question > about > > > the > > > > >>>>>> filter build and a some suggestions for potential > improvements. > > > > >> First, I > > > > >>>>>> wonder why you suggest to run the filter builder as separate > > > > operator > > > > >>>> with > > > > >>>>>> parallelism 1. I’d suggest to integrate the filter distributed > > > build > > > > >>>> with > > > > >>>>>> the hash table build phase as follows: > > > > >>>>>> > > > > >>>>>> 1. Build the hash table completely in each subtask. > > > > >>>>>> 2. The keyset of the hash table is giving us a precise NDV > count > > > for > > > > >>>> every > > > > >>>>>> subtask. > > > > >>>>>> 3. Build a filter from the subtask hash table. For low > > cardinality > > > > >>>> tables, > > > > >>>>>> I’d go with the suggested optimization of IN-filter. > > > > >>>>>> 4. Each build subtask transfers the local bloom filter to all > > > probe > > > > >>>>>> operators. > > > > >>>>>> 5. On the probe operator we can either probe against the > > > individual > > > > >>>>>> filters, or we OR-combine all subtask filters into aggregated > > > bloom > > > > >>>> filter. > > > > >>>>>> > > > > >>>>>> I’m suggesting this because building inserting into a (larger) > > > bloom > > > > >>>>>> filter can be costly, especially once the filter exceeds cache > > > sizes > > > > >>>> and is > > > > >>>>>> therefor better parallelized. First inserting into the hash > > table > > > > also > > > > >>>>>> deduplicates the keys and we avoid inserting records twice > into > > > the > > > > >>>> bloom > > > > >>>>>> filter. If we want to improve cache efficiency for the build > of > > > > larger > > > > >>>>>> filters, we could structure them as blocked bloom filters, > where > > > the > > > > >>>> filter > > > > >>>>>> is separated into blocks and all bits of one key go only into > > one > > > > >> block. > > > > >>>>>> That allows us to apply software managed buffering to first > > group > > > > keys > > > > >>>> that > > > > >>>>>> go into the same partition (ideally fitting into cache) and > then > > > > bulk > > > > >>>> load > > > > >>>>>> partitions once we collected enough keys for one round of > > loading. > > > > >>>>>> > > > > >>>>>> Best, > > > > >>>>>> Stefan > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> < > > > > >>>> > > > > >> > > > > > > > > > > https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://www.confluent.io/%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw3p0tBjuVsWz3SLYyPQukfL%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw1THgA9fFMrOd7QpGpwiRx6&source=gmail-imap&ust=1687781326000000&usg=AOvVaw1f-3D9-2lZDGsvFBjeFlvn > > > > >>>>> > > > > >>>>>> Stefan Richter > > > > >>>>>> Principal Engineer II > > > > >>>>>> > > > > >>>>>> Follow us: < > > > > >>>>>> > > > > >>>> > > > > >> > > > > > > > > > > https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://www.confluent.io/blog?utm_source%25253Dfooter%252526utm_medium%25253Demail%252526utm_campaign%25253Dch.email-signature_type.community_content.blog%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw2VU_JTYB24Wp4bF2JshdU7%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw37ghBlQPqP0tTXCfNJCqKv&source=gmail-imap&ust=1687781326000000&usg=AOvVaw20v4QTnSyAz_HAHbMyVY7J > > > > >>>>> > > > > >>>>>> < > > > > >>>> > > > > >> > > > > > > > > > > https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://twitter.com/ConfluentInc%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw2irnDxUAhXR0N8FUk2orze%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw0ItT553mEuA5KaeJWSH36D&source=gmail-imap&ust=1687781326000000&usg=AOvVaw1mNvHaIwjIKU_gqOuDYLDK > > > > >>>>> > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> > > > > >>>>>>> On 15. Jun 2023, at 13:35, Lijie Wang < > > wangdachui9...@gmail.com > > > > <mailto:wangdachui9...@gmail.com> > > > > >> <mailto:wangdachui9...@gmail.com> > > > > >>>> <mailto:wangdachui9...@gmail.com>> wrote: > > > > >>>>>>> > > > > >>>>>>> Hi, Benchao and Aitozi, > > > > >>>>>>> > > > > >>>>>>> Thanks for your feedback about this FLIP. > > > > >>>>>>> > > > > >>>>>>> @Benchao > > > > >>>>>>> > > > > >>>>>>>>> I think it would be reasonable to also support "pipeline > > > shuffle" > > > > >> if > > > > >>>>>>> possible. > > > > >>>>>>> As I said above, runtime filter can work well with all > shuffle > > > > mode, > > > > >>>>>>> including pipeline shuffle. > > > > >>>>>>> > > > > >>>>>>>>> if the RuntimeFIlterBuilder could be done quickly than > > > > >> RuntimeFilter > > > > >>>>>>> operator, it can still filter out additional data afterwards. > > > > >>>>>>> I think the main purpose of runtime filter is to reduce the > > > shuffle > > > > >>>> data > > > > >>>>>>> and the data arriving at join. Although eagerly running the > > large > > > > >>>>>>> table side can process datas in advance, most of the data may > > be > > > > >>>>>>> irrelevant, causing huge shuffle overhead and slowing the > join. > > > In > > > > >>>>>>> addition, if the join is a hash-join, the probe side of the > > > > hash-join > > > > >>>>>> also > > > > >>>>>>> needs to wait for its build side to complete, so the large > > table > > > > side > > > > >>>> is > > > > >>>>>>> likely to be back-pressed. > > > > >>>>>>> In addition, I don't tend to add too many configuration > options > > > in > > > > >> the > > > > >>>>>>> first version, which may make it more difficult to use (users > > > need > > > > to > > > > >>>>>>> understand a lot of internal implementation details). Maybe > it > > > > could > > > > >>>> be a > > > > >>>>>>> future improvement (if it's worthwhile)? > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>>> @Aitozi > > > > >>>>>>> > > > > >>>>>>>>> IMO, In the current implementation two source table > operators > > > > will > > > > >> be > > > > >>>>>>> executed simultaneously. > > > > >>>>>>> The example in FLIP uses blocking shuffle(I will add this > point > > > to > > > > >>>> FLIP). > > > > >>>>>>> The runtime filter is generally chained with the large table > > side > > > > to > > > > >>>>>> reduce > > > > >>>>>>> the shuffle data (as shown in Figure 2 of FLIP). The job > > vertices > > > > >>>> should > > > > >>>>>> be > > > > >>>>>>> scheduled in topological order, so the large table side can > > only > > > be > > > > >>>>>>> scheduled after the RuntimeFilterBuilder finishes. > > > > >>>>>>> > > > > >>>>>>>>> Are there some tests to show the default value of > > > > >>>>>>> table.optimizer.runtime-filter.min-probe-data-size 10G is a > > good > > > > >>>> default > > > > >>>>>>> value. > > > > >>>>>>> It's not tested yet, but it will be done before merge the > code. > > > The > > > > >>>>>> current > > > > >>>>>>> value refers to systems such as spark and hive. Before code > > > > merging, > > > > >> we > > > > >>>>>>> will test on TPC-DS 10 T to find an optimal set of values. If > > you > > > > >> have > > > > >>>>>>> relevant experience on it, welcome to give some suggestions. > > > > >>>>>>> > > > > >>>>>>>>> What's the representation of the runtime filter node in > > > planner ? > > > > >>>>>>> As shown in Figure 1 of FLIP, we intend to add two new > physical > > > > >> nodes, > > > > >>>>>>> RuntimeFilterBuilder and RuntimeFilter. > > > > >>>>>>> > > > > >>>>>>> Best, > > > > >>>>>>> Lijie > > > > >>>>>>> > > > > >>>>>>> Aitozi <gjying1...@gmail.com <mailto:gjying1...@gmail.com> > > > > <mailto:gjying1...@gmail.com> <mailto: > > > > >> gjying1...@gmail.com <mailto:gjying1...@gmail.com>> <mailto: > > > > >>>> gjying1...@gmail.com <mailto:gjying1...@gmail.com> <mailto: > > > > gjying1...@gmail.com>>> > > > > >>>>>> 于2023年6月15日周四 15:52写道: > > > > >>>>>>> > > > > >>>>>>>> Hi Lijie, > > > > >>>>>>>> > > > > >>>>>>>> Nice to see this valuable feature. After reading the FLIP I > > have > > > > >>>> some > > > > >>>>>>>> questions below: > > > > >>>>>>>> > > > > >>>>>>>>> Schedule the TableSource(dim) first. > > > > >>>>>>>> > > > > >>>>>>>> How does it know to schedule the TableSource(dim) first ? > IMO, > > > In > > > > >> the > > > > >>>>>>>> current implementation two source table operators will be > > > executed > > > > >>>>>>>> simultaneously. > > > > >>>>>>>> > > > > >>>>>>>>> If the data volume on the probe side is too small, the > > overhead > > > > of > > > > >>>>>>>> building runtime filter is not worth it. > > > > >>>>>>>> > > > > >>>>>>>> Are there some tests to show the default value of > > > > >>>>>>>> table.optimizer.runtime-filter.min-probe-data-size 10G is a > > good > > > > >>>> default > > > > >>>>>>>> value. The same to > > > > >> table.optimizer.runtime-filter.max-build-data-size > > > > >>>>>>>> > > > > >>>>>>>>> the runtime filter can be pushed down along the probe side, > > as > > > > >> close > > > > >>>> to > > > > >>>>>>>> data sources as possible > > > > >>>>>>>> > > > > >>>>>>>> What's the representation of the runtime filter node in > > planner > > > ? > > > > Is > > > > >>>> it > > > > >>>>>> a > > > > >>>>>>>> Filternode > > > > >>>>>>>> > > > > >>>>>>>> Best, > > > > >>>>>>>> > > > > >>>>>>>> Aitozi. > > > > >>>>>>>> > > > > >>>>>>>> Benchao Li <libenc...@apache.org <mailto: > libenc...@apache.org > > > > > > > <mailto:libenc...@apache.org> > > > > >> <mailto:libenc...@apache.org>> > > > > >>>> 于2023年6月15日周四 14:30写道: > > > > >>>>>>>> > > > > >>>>>>>>> Hi Lijie, > > > > >>>>>>>>> > > > > >>>>>>>>> Regarding the shuffle mode, I think it would be reasonable > to > > > > also > > > > >>>>>>>> support > > > > >>>>>>>>> "pipeline shuffle" if possible. > > > > >>>>>>>>> > > > > >>>>>>>>> "pipeline shuffle" is a essential for OLAP/MPP computing, > > > > although > > > > >>>> this > > > > >>>>>>>> has > > > > >>>>>>>>> not been much exposed to users for now, I know a few > > companies > > > > that > > > > >>>>>> uses > > > > >>>>>>>>> Flink as a MPP computing engine, and there is an ongoing > > > > effort[1] > > > > >> to > > > > >>>>>>>> make > > > > >>>>>>>>> this usage more powerful. > > > > >>>>>>>>> > > > > >>>>>>>>> Back to your concern that "Even if the RuntimeFilter > becomes > > > > >> running > > > > >>>>>>>> before > > > > >>>>>>>>> the RuntimeFilterBuilder finished, it will not process any > > data > > > > and > > > > >>>>>> will > > > > >>>>>>>>> occupy resources", whether it benefits us depends on the > > scale > > > of > > > > >>>> data, > > > > >>>>>>>> if > > > > >>>>>>>>> the RuntimeFIlterBuilder could be done quickly than > > > RuntimeFilter > > > > >>>>>>>> operator, > > > > >>>>>>>>> it can still filter out additional data afterwards. Hence > in > > my > > > > >>>>>> opinion, > > > > >>>>>>>> we > > > > >>>>>>>>> do not need to make the edge between RuntimeFilterBuilder > and > > > > >>>>>>>> RuntimeFilter > > > > >>>>>>>>> BLOCKING only, at least it can be configured. > > > > >>>>>>>>> > > > > >>>>>>>>> [1] > > > > >>>>>> > > > > >>>> > > > > >> > > > > > > > > > > https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://www.google.com/url?q%25253Dhttps://issues.apache.org/jira/browse/FLINK-25318%252526source%25253Dgmail-imap%252526ust%25253D1687433776000000%252526usg%25253DAOvVaw3GqdpuiCqegqRLDv1PjMiL%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw1oNzOlNn0UCDtz1M9jAw1x%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw3Zt14Wvxs_b8ghD0dIgPfH&source=gmail-imap&ust=1687781326000000&usg=AOvVaw0HsmkkqPeZGZOBvFiA8NOA > > > > >>>>>>>>> > > > > >>>>>>>>> Lijie Wang <wangdachui9...@gmail.com <mailto: > > > > wangdachui9...@gmail.com> <mailto: > > > > >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>> > > <mailto: > > > > >>>> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com> > > <mailto: > > > > wangdachui9...@gmail.com>> <mailto: > > > > >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>>> > > > > >>>>>> 于2023年6月15日周四 14:18写道: > > > > >>>>>>>>> > > > > >>>>>>>>>> Hi Yuxia, > > > > >>>>>>>>>> > > > > >>>>>>>>>> I made a mistake in the above response. > > > > >>>>>>>>>> > > > > >>>>>>>>>> The runtime filter can work well with all shuffle mode. > > > However, > > > > >>>>>> hybrid > > > > >>>>>>>>>> shuffle and blocking shuffle are currently recommended for > > > batch > > > > >>>> jobs > > > > >>>>>>>>>> (piepline shuffle is not recommended). > > > > >>>>>>>>>> > > > > >>>>>>>>>> One more thing to mention here is that we will force the > > edge > > > > >>>> between > > > > >>>>>>>>>> RuntimeFilterBuilder and RuntimeFilter to be > > > BLOCKING(regardless > > > > >> of > > > > >>>>>>>> which > > > > >>>>>>>>>> BatchShuffleMode is set). Because the RuntimeFilter really > > > > doesn’t > > > > >>>>>> need > > > > >>>>>>>>> to > > > > >>>>>>>>>> run before the RuntimeFilterBuilder finished. Even if the > > > > >>>>>> RuntimeFilter > > > > >>>>>>>>>> becomes running before the RuntimeFilterBuilder finished, > it > > > > will > > > > >>>> not > > > > >>>>>>>>>> process any data and will occupy resources. > > > > >>>>>>>>>> > > > > >>>>>>>>>> Best, > > > > >>>>>>>>>> Lijie > > > > >>>>>>>>>> > > > > >>>>>>>>>> Lijie Wang <wangdachui9...@gmail.com <mailto: > > > > wangdachui9...@gmail.com> <mailto: > > > > >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>> > > <mailto: > > > > >>>> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com> > > <mailto: > > > > wangdachui9...@gmail.com>> <mailto: > > > > >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>>> > > > > >>>>>> 于2023年6月15日周四 09:48写道: > > > > >>>>>>>>>> > > > > >>>>>>>>>>> Hi Yuxia, > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> Thanks for your feedback. The answers of your questions > are > > > as > > > > >>>>>>>> follows: > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> 1. Yes, the row count comes from statistic of underlying > > > > table(Or > > > > >>>>>>>>>>> estimated based on the statistic of underlying table, if > > the > > > > >> build > > > > >>>>>>>> side > > > > >>>>>>>>>> or > > > > >>>>>>>>>>> probe side is not TableScan). If the statistic > > unavailable, > > > we > > > > >>>> will > > > > >>>>>>>>> not > > > > >>>>>>>>>>> inject a runtime filter(As you said, we can hardly > evaluate > > > the > > > > >>>>>>>>>> benefits). > > > > >>>>>>>>>>> Besides, AFAIK, the estimated data size of build side is > > also > > > > >> based > > > > >>>>>>>> on > > > > >>>>>>>>>> the > > > > >>>>>>>>>>> row count statistics, that is, if the statistics is > > > > unavailable, > > > > >>>> the > > > > >>>>>>>>>>> requirement > > > > "table.optimizer.runtime-filter.max-build-data-size" > > > > >>>>>>>> cannot > > > > >>>>>>>>>> be > > > > >>>>>>>>>>> evaluated either. I'll add this point into FLIP. > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> 2. > > > > >>>>>>>>>>> Estimated data size does not meet requirement (in planner > > > > >>>>>>>> optimization > > > > >>>>>>>>>>> phase) -> No filter > > > > >>>>>>>>>>> Estimated data size meets the requirement (in planner > > > > >> optimization > > > > >>>>>>>>>> phase), > > > > >>>>>>>>>>> but the real data size does not meet the requirement(in > > > > execution > > > > >>>>>>>>> phase) > > > > >>>>>>>>>> -> > > > > >>>>>>>>>>> Fake filter > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> 3. Yes, the runtime filter is only for batch > jobs/blocking > > > > >> shuffle. > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> Best, > > > > >>>>>>>>>>> Lijie > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> yuxia <luoyu...@alumni.sjtu.edu.cn <mailto: > > > > luoyu...@alumni.sjtu.edu.cn> <mailto: > > > > >> luoyu...@alumni.sjtu.edu.cn <mailto:luoyu...@alumni.sjtu.edu.cn>> > > > > <mailto: > > > > >>>> luoyu...@alumni.sjtu.edu.cn <mailto:luoyu...@alumni.sjtu.edu.cn > > > > > > <mailto:luoyu...@alumni.sjtu.edu.cn>> > > > > >> <mailto: > > > > >>>>>> luoyu...@alumni.sjtu.edu.cn <mailto: > luoyu...@alumni.sjtu.edu.cn > > > > > > > <mailto:luoyu...@alumni.sjtu.edu.cn> > > > > >> <mailto:luoyu...@alumni.sjtu.edu.cn>>> > > > > >>>> 于2023年6月14日周三 20:37写道: > > > > >>>>>>>>>>> > > > > >>>>>>>>>>>> Thanks Lijie for starting this discussion. Excited to > see > > > > >> runtime > > > > >>>>>>>>> filter > > > > >>>>>>>>>>>> is to be implemented in Flink. > > > > >>>>>>>>>>>> I have few questions about it: > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> 1: As the FLIP said, `if the ndv cannot be estimated, > use > > > row > > > > >>>> count > > > > >>>>>>>>>>>> instead`. So, does row count comes from the statistic > from > > > > >>>>>>>> underlying > > > > >>>>>>>>>>>> table? What if the the statistic is also unavailable > > > > considering > > > > >>>>>>>> users > > > > >>>>>>>>>>>> maynot always remember to generate statistic in > > production. > > > > >>>>>>>>>>>> I'm wondering whether it make senese that just disable > > > runtime > > > > >>>>>>>> filter > > > > >>>>>>>>> if > > > > >>>>>>>>>>>> statistic is unavailable since in that case, we can > hardly > > > > >>>> evaluate > > > > >>>>>>>>> the > > > > >>>>>>>>>>>> benefits of runtime-filter. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> 2: The FLIP said: "We will inject the runtime filters > only > > > if > > > > >> the > > > > >>>>>>>>>>>> following requirements are met:xxx", but it also said, > > "Once > > > > >> this > > > > >>>>>>>>> limit > > > > >>>>>>>>>> is > > > > >>>>>>>>>>>> exceeded, it will output a fake filter(which always > > returns > > > > >> true)" > > > > >>>>>>>> in > > > > >>>>>>>>>>>> `RuntimeFilterBuilderOperator` part; Seems they are > > > > >> contradictory, > > > > >>>>>>>> so > > > > >>>>>>>>>> i'm > > > > >>>>>>>>>>>> wondering what's the real behavior, no filter will be > > > injected > > > > >> or > > > > >>>>>>>> fake > > > > >>>>>>>>>>>> filter? > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> 3: Does it also mean runtime-filter can only take effect > > in > > > > >>>> blocking > > > > >>>>>>>>>>>> shuffle? > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> Best regards, > > > > >>>>>>>>>>>> Yuxia > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> ----- 原始邮件 ----- > > > > >>>>>>>>>>>> 发件人: "ron9 liu" <ron9....@gmail.com <mailto: > > > > ron9....@gmail.com> <mailto:ron9....@gmail.com> > > > > >> <mailto:ron9....@gmail.com> > > > > >>>> <mailto:ron9....@gmail.com>> > > > > >>>>>>>>>>>> 收件人: "dev" <dev@flink.apache.org <mailto: > > > dev@flink.apache.org> > > > > <mailto:dev@flink.apache.org> > > > > >> <mailto:dev@flink.apache.org> > > > > >>>> <mailto:dev@flink.apache.org>> > > > > >>>>>>>>>>>> 发送时间: 星期三, 2023年 6 月 14日 下午 5:29:28 > > > > >>>>>>>>>>>> 主题: Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for > > > Flink > > > > >>>> Batch > > > > >>>>>>>>>> Jobs > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> Thanks Lijie start this discussion. Runtime Filter is a > > > common > > > > >>>>>>>>>>>> optimization > > > > >>>>>>>>>>>> to improve the join performance that has been adopted by > > > many > > > > >>>>>>>>> computing > > > > >>>>>>>>>>>> engines such as Spark, Doris, etc... Flink is a > streaming > > > > batch > > > > >>>>>>>>>> computing > > > > >>>>>>>>>>>> engine, and we are continuously optimizing the > performance > > > of > > > > >>>>>>>> batches. > > > > >>>>>>>>>>>> Runtime filter is a general performance optimization > > > technique > > > > >>>> that > > > > >>>>>>>>> can > > > > >>>>>>>>>>>> improve the performance of Flink batch jobs, so we are > > > > >> introducing > > > > >>>>>>>> it > > > > >>>>>>>>> on > > > > >>>>>>>>>>>> batch as well. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> Looking forward to all feedback. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> Best, > > > > >>>>>>>>>>>> Ron > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> Lijie Wang <wangdachui9...@gmail.com <mailto: > > > > wangdachui9...@gmail.com> <mailto: > > > > >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>> > > <mailto: > > > > >>>> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com> > > <mailto: > > > > wangdachui9...@gmail.com>> <mailto: > > > > >>>>>> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com> > > > <mailto: > > > > wangdachui9...@gmail.com> <mailto: > > > > >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>>>> > > > > >>>> 于2023年6月14日周三 17:17写道: > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>>> Hi devs > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> Ron Liu, Gen Luo and I would like to start a discussion > > > about > > > > >>>>>>>>>> FLIP-324: > > > > >>>>>>>>>>>>> Introduce Runtime Filter for Flink Batch Jobs[1] > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> Runtime Filter is a common optimization to improve join > > > > >>>>>>>> performance. > > > > >>>>>>>>>> It > > > > >>>>>>>>>>>> is > > > > >>>>>>>>>>>>> designed to dynamically generate filter conditions for > > > > certain > > > > >>>>>>>> Join > > > > >>>>>>>>>>>> queries > > > > >>>>>>>>>>>>> at runtime to reduce the amount of scanned or shuffled > > > data, > > > > >>>> avoid > > > > >>>>>>>>>>>>> unnecessary I/O and network transmission, and speed up > > the > > > > >> query. > > > > >>>>>>>>> Its > > > > >>>>>>>>>>>>> working principle is building a filter(e.g. bloom > filter) > > > > based > > > > >>>> on > > > > >>>>>>>>> the > > > > >>>>>>>>>>>> data > > > > >>>>>>>>>>>>> on the small table side(build side) first, then pass > this > > > > >> filter > > > > >>>>>>>> to > > > > >>>>>>>>>> the > > > > >>>>>>>>>>>>> large table side(probe side) to filter the irrelevant > > data > > > on > > > > >> it, > > > > >>>>>>>>> this > > > > >>>>>>>>>>>> can > > > > >>>>>>>>>>>>> reduce the data reaching the join and improve > > performance. > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> You can find more details in the FLIP-324[1]. Looking > > > forward > > > > >> to > > > > >>>>>>>>> your > > > > >>>>>>>>>>>>> feedback. > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> [1] > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>>>>>> > > > > >>>>>>>> > > > > >>>>>> > > > > >>>> > > > > >> > > > > > > > > > > https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://www.google.com/url?q%25253Dhttps://cwiki.apache.org/confluence/display/FLINK/FLIP-324%252525253A%2525252BIntroduce%2525252BRuntime%2525252BFilter%2525252Bfor%2525252BFlink%2525252BBatch%2525252BJobs%252526source%25253Dgmail-imap%252526ust%25253D1687433776000000%252526usg%25253DAOvVaw0ke1ZHcJ--A1QgsbB84MHA%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw21E3CQyayeBTYztmOnwMcz%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw0xVu0zYYNRmh8u8aq7uSi3&source=gmail-imap&ust=1687781326000000&usg=AOvVaw1LXwtWT177350iKD3sKCEt > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> Best, > > > > >>>>>>>>>>>>> Ron & Gen & Lijie > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>>>>>> > > > > >>>>>>>>> > > > > >>>>>>>>> -- > > > > >>>>>>>>> > > > > >>>>>>>>> Best, > > > > >>>>>>>>> Benchao Li > > > > > > > > > > > > > >