Hi all,

I've updated the FLIP according to the above discussion, the changes mainly
include:
Declare the supported shuffle types.
Build the runtime filters using a two-phase approach.
Add more potential future improvements.

Best,
Lijie

liu ron <ron9....@gmail.com> 于2023年6月20日周二 11:19写道:

> Hi, Jing
>
> Thanks for your feedback.
>
> > Afaiu, the runtime Filter will only be Injected when the gap between the
> build data size and prob data size is big enough. Let's make an extreme
> example. If the small table(build side) has one row and the large
> table(probe side) contains tens of billions of rows. This will be the ideal
> use case for the runtime filter and the improvement will be significant. Is
> this correct?
>
> Yes, you are right.
>
> > Speaking of the "Conditions of injecting Runtime Filter" in the FLIP,
> will
> the value of max-build-data-size and min-prob-data-size depend on the
> parallelism config? I.e. with the same data-size setting, is it possible to
> inject or don't inject runtime filters by adjusting the parallelism?
>
> First, let me clarify two points. The first is that RuntimeFilter decides
> whether to inject or not in the optimization phase, but we do not consider
> operator parallelism in the SQL optimization phase currently, which is set
> at the ExecNode level. The second is that in batch mode, the default
> AdaptiveBatchScheduler[1] is now used, which will derive the parallelism of
> the downstream operator based on the amount of data produced by the
> upstream operator, that is, the parallelism is determined by runtime
> adaptation. In the above case, we cannot decide whether to inject
> BloomFilter in the optimization stage based on parallelism.
> A more important point is that the purpose of Runtime Filter is to reduce
> the amount of data for shuffle, and thus the amount of data processed by
> the downstream join operator. Therefore, I understand that regardless of
> the parallelism of the probe, the amount of data in the shuffle must be
> reduced after inserting the Runtime Filter, which is beneficial to the join
> operator, so whether to insert the RuntimeFilter or not is not dependent on
> the parallelism.
>
> > Does it make sense to reconsider the formula of ratio
> calculation to help users easily control the filter injection?
>
> Only when ndv does not exist will row count be considered. when size uses
> the default value and ndv cannot be taken, it is true that this condition
> may always hold, but this does not seem to affect anything, and the user is
> also likely to change the value of the size. One question, how do you think
> we should make it easier for users to control the  filter injection?
>
>
> [1]:
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/#adaptive-batch-scheduler
>
> Best,
> Ron
>
> Jing Ge <j...@ververica.com.invalid> 于2023年6月20日周二 07:11写道:
>
> > Hi Lijie,
> >
> > Thanks for your proposal. It is a really nice feature. I'd like to ask a
> > few questions to understand your thoughts.
> >
> > Afaiu, the runtime Filter will only be Injected when the gap between the
> > build data size and prob data size is big enough. Let's make an extreme
> > example. If the small table(build side) has one row and the large
> > table(probe side) contains tens of billions of rows. This will be the
> ideal
> > use case for the runtime filter and the improvement will be significant.
> Is
> > this correct?
> >
> > Speaking of the "Conditions of injecting Runtime Filter" in the FLIP,
> will
> > the value of max-build-data-size and min-prob-data-size depend on the
> > parallelism config? I.e. with the same data-size setting, is it possible
> to
> > inject or don't inject runtime filters by adjusting the parallelism?
> >
> > In the FLIP, there are default values for the new configuration
> parameters
> > that will be used to check the injection condition. If ndv cannot be
> > estimated, row count will be used. Given the max-build-data-size is 10MB
> > and the min-prob-data-size is 10GB, in the worst case, the
> min-filter-ratio
> > will be 0.999, i.e. the probeNdv is 1000 times buildNdv . If we consider
> > the duplication and the fact that the large table might have more columns
> > than the small table, the probeNdv should still be 100 or 10 times
> > buildNdv, which ends up with a min-filter-ratio equals to 0.99 or 0.9.
> Both
> > are bigger than the default value 0.5 in the FLIP. If I am not mistaken,
> > commonly, a min-filter-ratio less than 0.99 will always allow injecting
> the
> > runtime filter. Does it make sense to reconsider the formula of ratio
> > calculation to help users easily control the filter injection?
> >
> > Best regards,
> > Jing
> >
> > On Mon, Jun 19, 2023 at 4:42 PM Lijie Wang <wangdachui9...@gmail.com>
> > wrote:
> >
> > > Hi Stefan,
> > >
> > > >> bypassing the dataflow
> > > I believe it's a possible solution, but it may require more
> coordination
> > > and extra conditions (such as DFS), I do think it should be excluded
> from
> > > the first version. I'll put it in Future+Improvements as a potential
> > > improvement.
> > >
> > > Thanks again for your quick reply :)
> > >
> > > Best,
> > > Lijie
> > >
> > > Stefan Richter <srich...@confluent.io.invalid> 于2023年6月19日周一 20:51写道:
> > >
> > > >
> > > > Hi Lijie,
> > > >
> > > > I think you understood me correctly. But I would not consider this a
> > true
> > > > cyclic dependency in the dataflow because I would not suggest to send
> > the
> > > > filter through an edge in the job graph from join to scan. I’d rather
> > > > bypass the stream graph for exchanging bringing the filter to the
> scan.
> > > For
> > > > example, the join could report the filter after the build phase, e.g.
> > to
> > > > the JM or a predefined DFS folder. And when the probe scan is
> > scheduled,
> > > > the JM provides the filter information to the scan when it gets
> > scheduled
> > > > for execution or the scan looks in DFS if it can find any filter that
> > it
> > > > can use as part of initialization. I’m not suggesting to do it
> exactly
> > in
> > > > those ways, but just to show what I mean by "bypassing the dataflow".
> > > >
> > > > Anyways, I’m fine with excluding this optimization from the current
> > FLIP
> > > > if you believe it would be hard to implement in Flink.
> > > >
> > > > Best,
> > > > Stefan
> > > >
> > > >
> > > > > On 19. Jun 2023, at 14:07, Lijie Wang <wangdachui9...@gmail.com>
> > > wrote:
> > > > >
> > > > > Hi Stefan,
> > > > >
> > > > > If I understand correctly(I hope so), the hash join operator needs
> to
> > > > send
> > > > > the bloom filter to probe scan, and probe scan also needs to send
> the
> > > > > filtered data to the hash join operator. This means there will be a
> > > cycle
> > > > > in the data flow, it will be hard for current Flink to schedule
> this
> > > kind
> > > > > of graph. I admit we can find a way to do this, but that's
> probably a
> > > > > bit outside the scope of this FLIP.  So let's do these complex
> > > > > optimizations later, WDYT?
> > > > >
> > > > > Best,
> > > > > Lijie
> > > > >
> > > > > Stefan Richter <srich...@confluent.io.invalid <mailto:
> > > > srich...@confluent.io.invalid>> 于2023年6月19日周一 18:15写道:
> > > > >
> > > > >> Hi Lijie,
> > > > >>
> > > > >> Exactly, my proposal was to build the bloom filter in the hash
> > > > operator. I
> > > > >> don’t know about all the details about the implementation of
> Flink’s
> > > > join
> > > > >> operator, but I’d assume that even if the join is a two input
> > operator
> > > > it
> > > > >> gets scheduled for 2 different pipelines. First the build phase
> with
> > > the
> > > > >> scan from the dimension table and after that’s completed the probe
> > > phase
> > > > >> with the scan of the fact table. I’m not proposing the use the
> bloom
> > > > filter
> > > > >> only in the join operator, but rather send the bloom filter to the
> > > probe
> > > > >> scan before starting the probe. I assume this would require some
> > form
> > > of
> > > > >> side channel to transport the filter and coordination to tell the
> > > > sources
> > > > >> that such a filter is available. I cannot answer how hard those
> > would
> > > > be to
> > > > >> implement, but the idea doesn’t seem impossible to me.
> > > > >>
> > > > >> Best,
> > > > >> Stefan
> > > > >>
> > > > >>
> > > > >>> On 19. Jun 2023, at 11:56, Lijie Wang <wangdachui9...@gmail.com>
> > > > wrote:
> > > > >>>
> > > > >>> Hi Stefan,
> > > > >>>
> > > > >>> Now I know what you mean about point 1. But currently it is
> > > unfeasible
> > > > >> for
> > > > >>> Flink, because the building of the hash table is inside the hash
> > join
> > > > >>> operator. The hash join operator has two inputs, it will first
> > > process
> > > > >> the
> > > > >>> data of the build-input to build a hash table, and then use the
> > hash
> > > > >> table
> > > > >>> to process the data of the probe-input. If we want to use the
> built
> > > > hash
> > > > >>> table to deduplicate data for bloom filter, we must put the bloom
> > > > filter
> > > > >>> inside the hash join operator.  However, in this way, the data
> > > reaching
> > > > >> the
> > > > >>> join operator cannot be reduced (the shuffle/network overhead
> > cannot
> > > be
> > > > >>> reduced), which is not what we expected.
> > > > >>>
> > > > >>> Regarding the filter type, I agree with you, more types of
> filters
> > > can
> > > > >>> get further
> > > > >>> optimization,  and it is in our future plan (We described it in
> the
> > > > >> section
> > > > >>> Future+Improvements#More+underlying+implementations).
> > > > >>>
> > > > >>> Best,
> > > > >>> Lijie
> > > > >>>
> > > > >>> Stefan Richter <srich...@confluent.io.invalid <mailto:
> > > > srich...@confluent.io.invalid> <mailto:
> > > > >> srich...@confluent.io.invalid <mailto:
> srich...@confluent.io.invalid
> > > >>>
> > > > 于2023年6月19日周一 15:58写道:
> > > > >>>
> > > > >>>>
> > > > >>>> Hi Lijie,
> > > > >>>>
> > > > >>>> thanks for your response, I agree with what you said about
> points
> > 2
> > > > and
> > > > >> 3.
> > > > >>>> Let me explain a bit more about point 1. This would not apply to
> > all
> > > > >> types
> > > > >>>> of joins and my suggestion is also *not* to build a hash table
> > only
> > > > for
> > > > >> the
> > > > >>>> purpose to build the bloom filter.
> > > > >>>> I was thinking about the scenario of a hash join, where you
> would
> > > > build
> > > > >>>> the hash table as part of the join algorithm anyways and then
> use
> > > the
> > > > >>>> keyset of that hash table to 1) have better insights on about
> NDV
> > > and
> > > > >> 2) be
> > > > >>>> able to construct the bloom filter without duplicates and
> > therefore
> > > > >> faster.
> > > > >>>> So the preconditions where I would use this is if you are
> > building a
> > > > >> hash
> > > > >>>> table as part of the join and you know you are not building for
> a
> > > key
> > > > >>>> column (because there would be no duplicates to eliminate). Then
> > > your
> > > > >> bloom
> > > > >>>> filter construction could benefit already from the deduplication
> > > work
> > > > >> that
> > > > >>>> was done for building the hash table.
> > > > >>>>
> > > > >>>> I also wanted to point out that besides bloom filter and IN
> filter
> > > you
> > > > >>>> could also think of other types of filter that can become
> > > interesting
> > > > >> for
> > > > >>>> certain distributions and meta data. For example, if you have
> > > min/max
> > > > >>>> information about columns and partitions you could have a bit
> > vector
> > > > >>>> represent equilibrium-sized ranges of the key space between min
> > and
> > > > max
> > > > >> and
> > > > >>>> have the bits represent what part of the range is present and
> push
> > > > that
> > > > >>>> information down to the scan.
> > > > >>>>
> > > > >>>> Best,
> > > > >>>> Stefan
> > > > >>>>
> > > > >>>>
> > > > >>>>> On 19. Jun 2023, at 08:26, Lijie Wang <
> wangdachui9...@gmail.com
> > > > <mailto:wangdachui9...@gmail.com>>
> > > > >> wrote:
> > > > >>>>>
> > > > >>>>> Hi Stefan,
> > > > >>>>>
> > > > >>>>> Thanks for your feedback. Let me briefly summarize the
> > optimization
> > > > >>>> points
> > > > >>>>> you mentioned above (Please correct me if I'm wrong):
> > > > >>>>>
> > > > >>>>> 1. Build an extra hash table for deduplication before building
> > the
> > > > >> bloom
> > > > >>>>> filter.
> > > > >>>>> 2. Use the two-phase approach to build the bloom filter(first
> > > local,
> > > > >> then
> > > > >>>>> OR-combine).
> > > > >>>>> 3. Use blocked bloom filters to improve the cache efficiency.
> > > > >>>>>
> > > > >>>>> For the above 3 points, I have the following questions or
> > opinions:
> > > > >>>>>
> > > > >>>>> For point 1, it seems that building a hash table also requires
> > > > >> traversing
> > > > >>>>> all build side data, and the overhead seems to be the same as
> > > > building
> > > > >> a
> > > > >>>>> bloom filter directly? In addition, the hash table will take up
> > > more
> > > > >>>> space
> > > > >>>>> when the amount of data is large, which is why we choose to use
> > > bloom
> > > > >>>>> filter instead of hash table.
> > > > >>>>>
> > > > >>>>> For point 2, I think it's a good idea to use the two-phase
> > approach
> > > > to
> > > > >>>>> build the bloom filter. But rather than directly broadcasting
> the
> > > > local
> > > > >>>>> bloom filter to the probe side, I prefer to introduce a global
> > node
> > > > for
> > > > >>>> the
> > > > >>>>> OR-combine(like two-phase-agg[1]), then broadcast the combined
> > > bloom
> > > > >>>> filter
> > > > >>>>> to the probe side. The latter can reduce the amount of data
> > > > transferred
> > > > >>>> by
> > > > >>>>> the network. I will change the FLIP like this.
> > > > >>>>>
> > > > >>>>> For point 3, I think it's a nice optimization, but I prefer to
> > put
> > > it
> > > > >> to
> > > > >>>>> the future improvements. There is already an implementation of
> > > bloom
> > > > >>>> filter
> > > > >>>>> in flink, we can simply reuse it. Introducing a new bloom
> filter
> > > > >>>>> implementation introduces some complexity  (we need to
> implement
> > > it,
> > > > >> test
> > > > >>>>> it, etc), and is not the focus of this FLIP.
> > > > >>>>>
> > > > >>>>> [1]
> > > > >>>>>
> > > > >>>>
> > > > >>
> > > >
> > >
> >
> https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/%252523local-global-aggregation%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw2eoXknGWmG4TSiznxtHFWG%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw3V4Sv1o119cpU4xfP0ifkj&source=gmail-imap&ust=1687781326000000&usg=AOvVaw033xxrkenJpx27XzCVKsda
> > > > >>>>>
> > > > >>>>> Best,
> > > > >>>>> Lijie
> > > > >>>>>
> > > > >>>>> Stefan Richter <srich...@confluent.io.invalid <mailto:
> > > > srich...@confluent.io.invalid> <mailto:
> > > > >> srich...@confluent.io.invalid <mailto:
> srich...@confluent.io.invalid
> > >>
> > > > <mailto:
> > > > >>>> srich...@confluent.io.invalid <mailto:
> > srich...@confluent.io.invalid
> > > >
> > > > <mailto:srich...@confluent.io.invalid>>>
> > > > >> 于2023年6月16日周五 16:45写道:
> > > > >>>>>
> > > > >>>>>> Hi,
> > > > >>>>>>
> > > > >>>>>> Thanks for the proposal of this feature! I have a question
> about
> > > the
> > > > >>>>>> filter build and a some suggestions for potential
> improvements.
> > > > >> First, I
> > > > >>>>>> wonder why you suggest to run the filter builder as separate
> > > > operator
> > > > >>>> with
> > > > >>>>>> parallelism 1. I’d suggest to integrate the filter distributed
> > > build
> > > > >>>> with
> > > > >>>>>> the hash table build phase as follows:
> > > > >>>>>>
> > > > >>>>>> 1. Build the hash table completely in each subtask.
> > > > >>>>>> 2. The keyset of the hash table is giving us a precise NDV
> count
> > > for
> > > > >>>> every
> > > > >>>>>> subtask.
> > > > >>>>>> 3. Build a filter from the subtask hash table. For low
> > cardinality
> > > > >>>> tables,
> > > > >>>>>> I’d go with the suggested optimization of IN-filter.
> > > > >>>>>> 4. Each build subtask transfers the local bloom filter to all
> > > probe
> > > > >>>>>> operators.
> > > > >>>>>> 5. On the probe operator we can either probe against the
> > > individual
> > > > >>>>>> filters, or we OR-combine all subtask filters into aggregated
> > > bloom
> > > > >>>> filter.
> > > > >>>>>>
> > > > >>>>>> I’m suggesting this because building inserting into a (larger)
> > > bloom
> > > > >>>>>> filter can be costly, especially once the filter exceeds cache
> > > sizes
> > > > >>>> and is
> > > > >>>>>> therefor better parallelized. First inserting into the hash
> > table
> > > > also
> > > > >>>>>> deduplicates the keys and we avoid inserting records twice
> into
> > > the
> > > > >>>> bloom
> > > > >>>>>> filter. If we want to improve cache efficiency for the build
> of
> > > > larger
> > > > >>>>>> filters, we could structure them as blocked bloom filters,
> where
> > > the
> > > > >>>> filter
> > > > >>>>>> is separated into blocks and all bits of one key go only into
> > one
> > > > >> block.
> > > > >>>>>> That allows us to apply software managed buffering to first
> > group
> > > > keys
> > > > >>>> that
> > > > >>>>>> go into the same partition (ideally fitting into cache) and
> then
> > > > bulk
> > > > >>>> load
> > > > >>>>>> partitions once we collected enough keys for one round of
> > loading.
> > > > >>>>>>
> > > > >>>>>> Best,
> > > > >>>>>> Stefan
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> <
> > > > >>>>
> > > > >>
> > > >
> > >
> >
> https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://www.confluent.io/%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw3p0tBjuVsWz3SLYyPQukfL%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw1THgA9fFMrOd7QpGpwiRx6&source=gmail-imap&ust=1687781326000000&usg=AOvVaw1f-3D9-2lZDGsvFBjeFlvn
> > > > >>>>>
> > > > >>>>>> Stefan Richter
> > > > >>>>>> Principal Engineer II
> > > > >>>>>>
> > > > >>>>>> Follow us:  <
> > > > >>>>>>
> > > > >>>>
> > > > >>
> > > >
> > >
> >
> https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://www.confluent.io/blog?utm_source%25253Dfooter%252526utm_medium%25253Demail%252526utm_campaign%25253Dch.email-signature_type.community_content.blog%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw2VU_JTYB24Wp4bF2JshdU7%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw37ghBlQPqP0tTXCfNJCqKv&source=gmail-imap&ust=1687781326000000&usg=AOvVaw20v4QTnSyAz_HAHbMyVY7J
> > > > >>>>>
> > > > >>>>>> <
> > > > >>>>
> > > > >>
> > > >
> > >
> >
> https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://twitter.com/ConfluentInc%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw2irnDxUAhXR0N8FUk2orze%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw0ItT553mEuA5KaeJWSH36D&source=gmail-imap&ust=1687781326000000&usg=AOvVaw1mNvHaIwjIKU_gqOuDYLDK
> > > > >>>>>
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>> On 15. Jun 2023, at 13:35, Lijie Wang <
> > wangdachui9...@gmail.com
> > > > <mailto:wangdachui9...@gmail.com>
> > > > >> <mailto:wangdachui9...@gmail.com>
> > > > >>>> <mailto:wangdachui9...@gmail.com>> wrote:
> > > > >>>>>>>
> > > > >>>>>>> Hi,  Benchao and Aitozi,
> > > > >>>>>>>
> > > > >>>>>>> Thanks for your feedback about this FLIP.
> > > > >>>>>>>
> > > > >>>>>>> @Benchao
> > > > >>>>>>>
> > > > >>>>>>>>> I think it would be reasonable to also support "pipeline
> > > shuffle"
> > > > >> if
> > > > >>>>>>> possible.
> > > > >>>>>>> As I said above, runtime filter can work well with all
> shuffle
> > > > mode,
> > > > >>>>>>> including pipeline shuffle.
> > > > >>>>>>>
> > > > >>>>>>>>> if the RuntimeFIlterBuilder could be done quickly than
> > > > >> RuntimeFilter
> > > > >>>>>>> operator, it can still filter out additional data afterwards.
> > > > >>>>>>> I think the main purpose of runtime filter is to reduce the
> > > shuffle
> > > > >>>> data
> > > > >>>>>>> and the data arriving at join. Although eagerly running the
> > large
> > > > >>>>>>> table side can process datas in advance, most of the data may
> > be
> > > > >>>>>>> irrelevant, causing huge shuffle overhead and slowing the
> join.
> > > In
> > > > >>>>>>> addition, if the join is a hash-join, the probe side of the
> > > > hash-join
> > > > >>>>>> also
> > > > >>>>>>> needs to wait for its build side to complete, so the large
> > table
> > > > side
> > > > >>>> is
> > > > >>>>>>> likely to be back-pressed.
> > > > >>>>>>> In addition, I don't tend to add too many configuration
> options
> > > in
> > > > >> the
> > > > >>>>>>> first version, which may make it more difficult to use (users
> > > need
> > > > to
> > > > >>>>>>> understand a lot of internal implementation details). Maybe
> it
> > > > could
> > > > >>>> be a
> > > > >>>>>>> future improvement (if it's worthwhile)?
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> @Aitozi
> > > > >>>>>>>
> > > > >>>>>>>>> IMO, In the current implementation two source table
> operators
> > > > will
> > > > >> be
> > > > >>>>>>> executed simultaneously.
> > > > >>>>>>> The example in FLIP uses blocking shuffle(I will add this
> point
> > > to
> > > > >>>> FLIP).
> > > > >>>>>>> The runtime filter is generally chained with the large table
> > side
> > > > to
> > > > >>>>>> reduce
> > > > >>>>>>> the shuffle data (as shown in Figure 2 of FLIP). The job
> > vertices
> > > > >>>> should
> > > > >>>>>> be
> > > > >>>>>>> scheduled in topological order, so the large table side can
> > only
> > > be
> > > > >>>>>>> scheduled after the RuntimeFilterBuilder finishes.
> > > > >>>>>>>
> > > > >>>>>>>>> Are there some tests to show the default value of
> > > > >>>>>>> table.optimizer.runtime-filter.min-probe-data-size 10G is a
> > good
> > > > >>>> default
> > > > >>>>>>> value.
> > > > >>>>>>> It's not tested yet, but it will be done before merge the
> code.
> > > The
> > > > >>>>>> current
> > > > >>>>>>> value refers to systems such as spark and hive. Before code
> > > > merging,
> > > > >> we
> > > > >>>>>>> will test on TPC-DS 10 T to find an optimal set of values. If
> > you
> > > > >> have
> > > > >>>>>>> relevant experience on it, welcome to give some suggestions.
> > > > >>>>>>>
> > > > >>>>>>>>> What's the representation of the runtime filter node in
> > > planner ?
> > > > >>>>>>> As shown in Figure 1 of FLIP, we intend to add two new
> physical
> > > > >> nodes,
> > > > >>>>>>> RuntimeFilterBuilder and RuntimeFilter.
> > > > >>>>>>>
> > > > >>>>>>> Best,
> > > > >>>>>>> Lijie
> > > > >>>>>>>
> > > > >>>>>>> Aitozi <gjying1...@gmail.com <mailto:gjying1...@gmail.com>
> > > > <mailto:gjying1...@gmail.com> <mailto:
> > > > >> gjying1...@gmail.com <mailto:gjying1...@gmail.com>> <mailto:
> > > > >>>> gjying1...@gmail.com <mailto:gjying1...@gmail.com> <mailto:
> > > > gjying1...@gmail.com>>>
> > > > >>>>>> 于2023年6月15日周四 15:52写道:
> > > > >>>>>>>
> > > > >>>>>>>> Hi Lijie,
> > > > >>>>>>>>
> > > > >>>>>>>> Nice to see this valuable feature. After reading the FLIP I
> > have
> > > > >>>> some
> > > > >>>>>>>> questions below:
> > > > >>>>>>>>
> > > > >>>>>>>>> Schedule the TableSource(dim) first.
> > > > >>>>>>>>
> > > > >>>>>>>> How does it know to schedule the TableSource(dim) first ?
> IMO,
> > > In
> > > > >> the
> > > > >>>>>>>> current implementation two source table operators will be
> > > executed
> > > > >>>>>>>> simultaneously.
> > > > >>>>>>>>
> > > > >>>>>>>>> If the data volume on the probe side is too small, the
> > overhead
> > > > of
> > > > >>>>>>>> building runtime filter is not worth it.
> > > > >>>>>>>>
> > > > >>>>>>>> Are there some tests to show the default value of
> > > > >>>>>>>> table.optimizer.runtime-filter.min-probe-data-size 10G is a
> > good
> > > > >>>> default
> > > > >>>>>>>> value. The same to
> > > > >> table.optimizer.runtime-filter.max-build-data-size
> > > > >>>>>>>>
> > > > >>>>>>>>> the runtime filter can be pushed down along the probe side,
> > as
> > > > >> close
> > > > >>>> to
> > > > >>>>>>>> data sources as possible
> > > > >>>>>>>>
> > > > >>>>>>>> What's the representation of the runtime filter node in
> > planner
> > > ?
> > > > Is
> > > > >>>> it
> > > > >>>>>> a
> > > > >>>>>>>> Filternode
> > > > >>>>>>>>
> > > > >>>>>>>> Best,
> > > > >>>>>>>>
> > > > >>>>>>>> Aitozi.
> > > > >>>>>>>>
> > > > >>>>>>>> Benchao Li <libenc...@apache.org <mailto:
> libenc...@apache.org
> > >
> > > > <mailto:libenc...@apache.org>
> > > > >> <mailto:libenc...@apache.org>>
> > > > >>>> 于2023年6月15日周四 14:30写道:
> > > > >>>>>>>>
> > > > >>>>>>>>> Hi Lijie,
> > > > >>>>>>>>>
> > > > >>>>>>>>> Regarding the shuffle mode, I think it would be reasonable
> to
> > > > also
> > > > >>>>>>>> support
> > > > >>>>>>>>> "pipeline shuffle" if possible.
> > > > >>>>>>>>>
> > > > >>>>>>>>> "pipeline shuffle" is a essential for OLAP/MPP computing,
> > > > although
> > > > >>>> this
> > > > >>>>>>>> has
> > > > >>>>>>>>> not been much exposed to users for now, I know a few
> > companies
> > > > that
> > > > >>>>>> uses
> > > > >>>>>>>>> Flink as a MPP computing engine, and there is an ongoing
> > > > effort[1]
> > > > >> to
> > > > >>>>>>>> make
> > > > >>>>>>>>> this usage more powerful.
> > > > >>>>>>>>>
> > > > >>>>>>>>> Back to your concern that "Even if the RuntimeFilter
> becomes
> > > > >> running
> > > > >>>>>>>> before
> > > > >>>>>>>>> the RuntimeFilterBuilder finished, it will not process any
> > data
> > > > and
> > > > >>>>>> will
> > > > >>>>>>>>> occupy resources", whether it benefits us depends on the
> > scale
> > > of
> > > > >>>> data,
> > > > >>>>>>>> if
> > > > >>>>>>>>> the RuntimeFIlterBuilder could be done quickly than
> > > RuntimeFilter
> > > > >>>>>>>> operator,
> > > > >>>>>>>>> it can still filter out additional data afterwards. Hence
> in
> > my
> > > > >>>>>> opinion,
> > > > >>>>>>>> we
> > > > >>>>>>>>> do not need to make the edge between RuntimeFilterBuilder
> and
> > > > >>>>>>>> RuntimeFilter
> > > > >>>>>>>>> BLOCKING only, at least it can be configured.
> > > > >>>>>>>>>
> > > > >>>>>>>>> [1]
> > > > >>>>>>
> > > > >>>>
> > > > >>
> > > >
> > >
> >
> https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://www.google.com/url?q%25253Dhttps://issues.apache.org/jira/browse/FLINK-25318%252526source%25253Dgmail-imap%252526ust%25253D1687433776000000%252526usg%25253DAOvVaw3GqdpuiCqegqRLDv1PjMiL%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw1oNzOlNn0UCDtz1M9jAw1x%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw3Zt14Wvxs_b8ghD0dIgPfH&source=gmail-imap&ust=1687781326000000&usg=AOvVaw0HsmkkqPeZGZOBvFiA8NOA
> > > > >>>>>>>>>
> > > > >>>>>>>>> Lijie Wang <wangdachui9...@gmail.com <mailto:
> > > > wangdachui9...@gmail.com> <mailto:
> > > > >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>>
> > <mailto:
> > > > >>>> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>
> > <mailto:
> > > > wangdachui9...@gmail.com>> <mailto:
> > > > >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>>>
> > > > >>>>>> 于2023年6月15日周四 14:18写道:
> > > > >>>>>>>>>
> > > > >>>>>>>>>> Hi Yuxia,
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> I made a mistake in the above response.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> The runtime filter can work well with all shuffle mode.
> > > However,
> > > > >>>>>> hybrid
> > > > >>>>>>>>>> shuffle and blocking shuffle are currently recommended for
> > > batch
> > > > >>>> jobs
> > > > >>>>>>>>>> (piepline shuffle is not recommended).
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> One more thing to mention here is that we will force the
> > edge
> > > > >>>> between
> > > > >>>>>>>>>> RuntimeFilterBuilder and RuntimeFilter to be
> > > BLOCKING(regardless
> > > > >> of
> > > > >>>>>>>> which
> > > > >>>>>>>>>> BatchShuffleMode is set). Because the RuntimeFilter really
> > > > doesn’t
> > > > >>>>>> need
> > > > >>>>>>>>> to
> > > > >>>>>>>>>> run before the RuntimeFilterBuilder finished. Even if the
> > > > >>>>>> RuntimeFilter
> > > > >>>>>>>>>> becomes running before the RuntimeFilterBuilder finished,
> it
> > > > will
> > > > >>>> not
> > > > >>>>>>>>>> process any data and will occupy resources.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> Best,
> > > > >>>>>>>>>> Lijie
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> Lijie Wang <wangdachui9...@gmail.com <mailto:
> > > > wangdachui9...@gmail.com> <mailto:
> > > > >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>>
> > <mailto:
> > > > >>>> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>
> > <mailto:
> > > > wangdachui9...@gmail.com>> <mailto:
> > > > >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>>>
> > > > >>>>>> 于2023年6月15日周四 09:48写道:
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> Hi Yuxia,
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> Thanks for your feedback. The answers of your questions
> are
> > > as
> > > > >>>>>>>> follows:
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> 1. Yes, the row count comes from statistic of underlying
> > > > table(Or
> > > > >>>>>>>>>>> estimated based on the statistic of underlying table, if
> > the
> > > > >> build
> > > > >>>>>>>> side
> > > > >>>>>>>>>> or
> > > > >>>>>>>>>>> probe side is not TableScan).  If the statistic
> > unavailable,
> > > we
> > > > >>>> will
> > > > >>>>>>>>> not
> > > > >>>>>>>>>>> inject a runtime filter(As you said, we can hardly
> evaluate
> > > the
> > > > >>>>>>>>>> benefits).
> > > > >>>>>>>>>>> Besides, AFAIK, the estimated data size of build side is
> > also
> > > > >> based
> > > > >>>>>>>> on
> > > > >>>>>>>>>> the
> > > > >>>>>>>>>>> row count statistics, that is, if the statistics is
> > > > unavailable,
> > > > >>>> the
> > > > >>>>>>>>>>> requirement
> > > > "table.optimizer.runtime-filter.max-build-data-size"
> > > > >>>>>>>> cannot
> > > > >>>>>>>>>> be
> > > > >>>>>>>>>>> evaluated either. I'll add this point into FLIP.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> 2.
> > > > >>>>>>>>>>> Estimated data size does not meet requirement (in planner
> > > > >>>>>>>> optimization
> > > > >>>>>>>>>>> phase) -> No filter
> > > > >>>>>>>>>>> Estimated data size meets the requirement (in planner
> > > > >> optimization
> > > > >>>>>>>>>> phase),
> > > > >>>>>>>>>>> but the real data size does not meet the requirement(in
> > > > execution
> > > > >>>>>>>>> phase)
> > > > >>>>>>>>>> ->
> > > > >>>>>>>>>>> Fake filter
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> 3. Yes, the runtime filter is only for batch
> jobs/blocking
> > > > >> shuffle.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> Best,
> > > > >>>>>>>>>>> Lijie
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> yuxia <luoyu...@alumni.sjtu.edu.cn <mailto:
> > > > luoyu...@alumni.sjtu.edu.cn> <mailto:
> > > > >> luoyu...@alumni.sjtu.edu.cn <mailto:luoyu...@alumni.sjtu.edu.cn>>
> > > > <mailto:
> > > > >>>> luoyu...@alumni.sjtu.edu.cn <mailto:luoyu...@alumni.sjtu.edu.cn
> >
> > > > <mailto:luoyu...@alumni.sjtu.edu.cn>>
> > > > >> <mailto:
> > > > >>>>>> luoyu...@alumni.sjtu.edu.cn <mailto:
> luoyu...@alumni.sjtu.edu.cn
> > >
> > > > <mailto:luoyu...@alumni.sjtu.edu.cn>
> > > > >> <mailto:luoyu...@alumni.sjtu.edu.cn>>>
> > > > >>>> 于2023年6月14日周三 20:37写道:
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>>> Thanks Lijie for starting this discussion. Excited to
> see
> > > > >> runtime
> > > > >>>>>>>>> filter
> > > > >>>>>>>>>>>> is to be implemented in Flink.
> > > > >>>>>>>>>>>> I have few questions about it:
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> 1: As the FLIP said, `if the ndv cannot be estimated,
> use
> > > row
> > > > >>>> count
> > > > >>>>>>>>>>>> instead`. So, does row count comes from the statistic
> from
> > > > >>>>>>>> underlying
> > > > >>>>>>>>>>>> table? What if the the statistic is also unavailable
> > > > considering
> > > > >>>>>>>> users
> > > > >>>>>>>>>>>> maynot always remember to generate statistic in
> > production.
> > > > >>>>>>>>>>>> I'm wondering whether it make senese that just disable
> > > runtime
> > > > >>>>>>>> filter
> > > > >>>>>>>>> if
> > > > >>>>>>>>>>>> statistic is unavailable since in that case, we can
> hardly
> > > > >>>> evaluate
> > > > >>>>>>>>> the
> > > > >>>>>>>>>>>> benefits of runtime-filter.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> 2: The FLIP said: "We will inject the runtime filters
> only
> > > if
> > > > >> the
> > > > >>>>>>>>>>>> following requirements are met:xxx", but it also said,
> > "Once
> > > > >> this
> > > > >>>>>>>>> limit
> > > > >>>>>>>>>> is
> > > > >>>>>>>>>>>> exceeded, it will output a fake filter(which always
> > returns
> > > > >> true)"
> > > > >>>>>>>> in
> > > > >>>>>>>>>>>> `RuntimeFilterBuilderOperator` part; Seems they are
> > > > >> contradictory,
> > > > >>>>>>>> so
> > > > >>>>>>>>>> i'm
> > > > >>>>>>>>>>>> wondering what's the real behavior, no filter will be
> > > injected
> > > > >> or
> > > > >>>>>>>> fake
> > > > >>>>>>>>>>>> filter?
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> 3: Does it also mean runtime-filter can only take effect
> > in
> > > > >>>> blocking
> > > > >>>>>>>>>>>> shuffle?
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> Best regards,
> > > > >>>>>>>>>>>> Yuxia
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> ----- 原始邮件 -----
> > > > >>>>>>>>>>>> 发件人: "ron9 liu" <ron9....@gmail.com <mailto:
> > > > ron9....@gmail.com> <mailto:ron9....@gmail.com>
> > > > >> <mailto:ron9....@gmail.com>
> > > > >>>> <mailto:ron9....@gmail.com>>
> > > > >>>>>>>>>>>> 收件人: "dev" <dev@flink.apache.org <mailto:
> > > dev@flink.apache.org>
> > > > <mailto:dev@flink.apache.org>
> > > > >> <mailto:dev@flink.apache.org>
> > > > >>>> <mailto:dev@flink.apache.org>>
> > > > >>>>>>>>>>>> 发送时间: 星期三, 2023年 6 月 14日 下午 5:29:28
> > > > >>>>>>>>>>>> 主题: Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for
> > > Flink
> > > > >>>> Batch
> > > > >>>>>>>>>> Jobs
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> Thanks Lijie start this discussion. Runtime Filter is a
> > > common
> > > > >>>>>>>>>>>> optimization
> > > > >>>>>>>>>>>> to improve the join performance that has been adopted by
> > > many
> > > > >>>>>>>>> computing
> > > > >>>>>>>>>>>> engines such as Spark, Doris, etc... Flink is a
> streaming
> > > > batch
> > > > >>>>>>>>>> computing
> > > > >>>>>>>>>>>> engine, and we are continuously optimizing the
> performance
> > > of
> > > > >>>>>>>> batches.
> > > > >>>>>>>>>>>> Runtime filter is a general performance optimization
> > > technique
> > > > >>>> that
> > > > >>>>>>>>> can
> > > > >>>>>>>>>>>> improve the performance of Flink batch jobs, so we are
> > > > >> introducing
> > > > >>>>>>>> it
> > > > >>>>>>>>> on
> > > > >>>>>>>>>>>> batch as well.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> Looking forward to all feedback.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> Best,
> > > > >>>>>>>>>>>> Ron
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> Lijie Wang <wangdachui9...@gmail.com <mailto:
> > > > wangdachui9...@gmail.com> <mailto:
> > > > >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>>
> > <mailto:
> > > > >>>> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>
> > <mailto:
> > > > wangdachui9...@gmail.com>> <mailto:
> > > > >>>>>> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>
> > > <mailto:
> > > > wangdachui9...@gmail.com> <mailto:
> > > > >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>>>>
> > > > >>>> 于2023年6月14日周三 17:17写道:
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Hi devs
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Ron Liu, Gen Luo and I would like to start a discussion
> > > about
> > > > >>>>>>>>>> FLIP-324:
> > > > >>>>>>>>>>>>> Introduce Runtime Filter for Flink Batch Jobs[1]
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Runtime Filter is a common optimization to improve join
> > > > >>>>>>>> performance.
> > > > >>>>>>>>>> It
> > > > >>>>>>>>>>>> is
> > > > >>>>>>>>>>>>> designed to dynamically generate filter conditions for
> > > > certain
> > > > >>>>>>>> Join
> > > > >>>>>>>>>>>> queries
> > > > >>>>>>>>>>>>> at runtime to reduce the amount of scanned or shuffled
> > > data,
> > > > >>>> avoid
> > > > >>>>>>>>>>>>> unnecessary I/O and network transmission, and speed up
> > the
> > > > >> query.
> > > > >>>>>>>>> Its
> > > > >>>>>>>>>>>>> working principle is building a filter(e.g. bloom
> filter)
> > > > based
> > > > >>>> on
> > > > >>>>>>>>> the
> > > > >>>>>>>>>>>> data
> > > > >>>>>>>>>>>>> on the small table side(build side) first, then pass
> this
> > > > >> filter
> > > > >>>>>>>> to
> > > > >>>>>>>>>> the
> > > > >>>>>>>>>>>>> large table side(probe side) to filter the irrelevant
> > data
> > > on
> > > > >> it,
> > > > >>>>>>>>> this
> > > > >>>>>>>>>>>> can
> > > > >>>>>>>>>>>>> reduce the data reaching the join and improve
> > performance.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> You can find more details in the FLIP-324[1]. Looking
> > > forward
> > > > >> to
> > > > >>>>>>>>> your
> > > > >>>>>>>>>>>>> feedback.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> [1]
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>
> > > > >>>>
> > > > >>
> > > >
> > >
> >
> https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://www.google.com/url?q%25253Dhttps://cwiki.apache.org/confluence/display/FLINK/FLIP-324%252525253A%2525252BIntroduce%2525252BRuntime%2525252BFilter%2525252Bfor%2525252BFlink%2525252BBatch%2525252BJobs%252526source%25253Dgmail-imap%252526ust%25253D1687433776000000%252526usg%25253DAOvVaw0ke1ZHcJ--A1QgsbB84MHA%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw21E3CQyayeBTYztmOnwMcz%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw0xVu0zYYNRmh8u8aq7uSi3&source=gmail-imap&ust=1687781326000000&usg=AOvVaw1LXwtWT177350iKD3sKCEt
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Best,
> > > > >>>>>>>>>>>>> Ron & Gen & Lijie
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>> --
> > > > >>>>>>>>>
> > > > >>>>>>>>> Best,
> > > > >>>>>>>>> Benchao Li
> > > >
> > > >
> > >
> >
>

Reply via email to