Hi Lijie,

Exactly, my proposal was to build the bloom filter in the hash operator. I 
don’t know about all the details about the implementation of Flink’s join 
operator, but I’d assume that even if the join is a two input operator it gets 
scheduled for 2 different pipelines. First the build phase with the scan from 
the dimension table and after that’s completed the probe phase with the scan of 
the fact table. I’m not proposing the use the bloom filter only in the join 
operator, but rather send the bloom filter to the probe scan before starting 
the probe. I assume this would require some form of side channel to transport 
the filter and coordination to tell the sources that such a filter is 
available. I cannot answer how hard those would be to implement, but the idea 
doesn’t seem impossible to me.

Best,
Stefan


> On 19. Jun 2023, at 11:56, Lijie Wang <wangdachui9...@gmail.com> wrote:
> 
> Hi Stefan,
> 
> Now I know what you mean about point 1. But currently it is unfeasible for
> Flink, because the building of the hash table is inside the hash join
> operator. The hash join operator has two inputs, it will first process the
> data of the build-input to build a hash table, and then use the hash table
> to process the data of the probe-input. If we want to use the built hash
> table to deduplicate data for bloom filter, we must put the bloom filter
> inside the hash join operator.  However, in this way, the data reaching the
> join operator cannot be reduced (the shuffle/network overhead cannot be
> reduced), which is not what we expected.
> 
> Regarding the filter type, I agree with you, more types of filters can
> get further
> optimization,  and it is in our future plan (We described it in the section
> Future+Improvements#More+underlying+implementations).
> 
> Best,
> Lijie
> 
> Stefan Richter <srich...@confluent.io.invalid 
> <mailto:srich...@confluent.io.invalid>> 于2023年6月19日周一 15:58写道:
> 
>> 
>> Hi Lijie,
>> 
>> thanks for your response, I agree with what you said about points 2 and 3.
>> Let me explain a bit more about point 1. This would not apply to all types
>> of joins and my suggestion is also *not* to build a hash table only for the
>> purpose to build the bloom filter.
>> I was thinking about the scenario of a hash join, where you would build
>> the hash table as part of the join algorithm anyways and then use the
>> keyset of that hash table to 1) have better insights on about NDV and 2) be
>> able to construct the bloom filter without duplicates and therefore faster.
>> So the preconditions where I would use this is if you are building a hash
>> table as part of the join and you know you are not building for a key
>> column (because there would be no duplicates to eliminate). Then your bloom
>> filter construction could benefit already from the deduplication work that
>> was done for building the hash table.
>> 
>> I also wanted to point out that besides bloom filter and IN filter you
>> could also think of other types of filter that can become interesting for
>> certain distributions and meta data. For example, if you have min/max
>> information about columns and partitions you could have a bit vector
>> represent equilibrium-sized ranges of the key space between min and max and
>> have the bits represent what part of the range is present and push that
>> information down to the scan.
>> 
>> Best,
>> Stefan
>> 
>> 
>>> On 19. Jun 2023, at 08:26, Lijie Wang <wangdachui9...@gmail.com> wrote:
>>> 
>>> Hi Stefan,
>>> 
>>> Thanks for your feedback. Let me briefly summarize the optimization
>> points
>>> you mentioned above (Please correct me if I'm wrong):
>>> 
>>> 1. Build an extra hash table for deduplication before building the bloom
>>> filter.
>>> 2. Use the two-phase approach to build the bloom filter(first local, then
>>> OR-combine).
>>> 3. Use blocked bloom filters to improve the cache efficiency.
>>> 
>>> For the above 3 points, I have the following questions or opinions:
>>> 
>>> For point 1, it seems that building a hash table also requires traversing
>>> all build side data, and the overhead seems to be the same as building a
>>> bloom filter directly? In addition, the hash table will take up more
>> space
>>> when the amount of data is large, which is why we choose to use bloom
>>> filter instead of hash table.
>>> 
>>> For point 2, I think it's a good idea to use the two-phase approach to
>>> build the bloom filter. But rather than directly broadcasting the local
>>> bloom filter to the probe side, I prefer to introduce a global node for
>> the
>>> OR-combine(like two-phase-agg[1]), then broadcast the combined bloom
>> filter
>>> to the probe side. The latter can reduce the amount of data transferred
>> by
>>> the network. I will change the FLIP like this.
>>> 
>>> For point 3, I think it's a nice optimization, but I prefer to put it to
>>> the future improvements. There is already an implementation of bloom
>> filter
>>> in flink, we can simply reuse it. Introducing a new bloom filter
>>> implementation introduces some complexity  (we need to implement it, test
>>> it, etc), and is not the focus of this FLIP.
>>> 
>>> [1]
>>> 
>> https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/%2523local-global-aggregation%26source%3Dgmail-imap%26ust%3D1687760804000000%26usg%3DAOvVaw2eoXknGWmG4TSiznxtHFWG&source=gmail-imap&ust=1687773407000000&usg=AOvVaw3V4Sv1o119cpU4xfP0ifkj
>>> 
>>> Best,
>>> Lijie
>>> 
>>> Stefan Richter <srich...@confluent.io.invalid 
>>> <mailto:srich...@confluent.io.invalid> <mailto:
>> srich...@confluent.io.invalid <mailto:srich...@confluent.io.invalid>>> 
>> 于2023年6月16日周五 16:45写道:
>>> 
>>>> Hi,
>>>> 
>>>> Thanks for the proposal of this feature! I have a question about the
>>>> filter build and a some suggestions for potential improvements. First, I
>>>> wonder why you suggest to run the filter builder as separate operator
>> with
>>>> parallelism 1. I’d suggest to integrate the filter distributed build
>> with
>>>> the hash table build phase as follows:
>>>> 
>>>> 1. Build the hash table completely in each subtask.
>>>> 2. The keyset of the hash table is giving us a precise NDV count for
>> every
>>>> subtask.
>>>> 3. Build a filter from the subtask hash table. For low cardinality
>> tables,
>>>> I’d go with the suggested optimization of IN-filter.
>>>> 4. Each build subtask transfers the local bloom filter to all probe
>>>> operators.
>>>> 5. On the probe operator we can either probe against the individual
>>>> filters, or we OR-combine all subtask filters into aggregated bloom
>> filter.
>>>> 
>>>> I’m suggesting this because building inserting into a (larger) bloom
>>>> filter can be costly, especially once the filter exceeds cache sizes
>> and is
>>>> therefor better parallelized. First inserting into the hash table also
>>>> deduplicates the keys and we avoid inserting records twice into the
>> bloom
>>>> filter. If we want to improve cache efficiency for the build of larger
>>>> filters, we could structure them as blocked bloom filters, where the
>> filter
>>>> is separated into blocks and all bits of one key go only into one block.
>>>> That allows us to apply software managed buffering to first group keys
>> that
>>>> go into the same partition (ideally fitting into cache) and then bulk
>> load
>>>> partitions once we collected enough keys for one round of loading.
>>>> 
>>>> Best,
>>>> Stefan
>>>> 
>>>> 
>>>> <
>> https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.confluent.io/%26source%3Dgmail-imap%26ust%3D1687760804000000%26usg%3DAOvVaw3p0tBjuVsWz3SLYyPQukfL&source=gmail-imap&ust=1687773407000000&usg=AOvVaw1THgA9fFMrOd7QpGpwiRx6
>>> 
>>>> Stefan Richter
>>>> Principal Engineer II
>>>> 
>>>> Follow us:  <
>>>> 
>> https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.confluent.io/blog?utm_source%253Dfooter%2526utm_medium%253Demail%2526utm_campaign%253Dch.email-signature_type.community_content.blog%26source%3Dgmail-imap%26ust%3D1687760804000000%26usg%3DAOvVaw2VU_JTYB24Wp4bF2JshdU7&source=gmail-imap&ust=1687773407000000&usg=AOvVaw37ghBlQPqP0tTXCfNJCqKv
>>> 
>>>> <
>> https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://twitter.com/ConfluentInc%26source%3Dgmail-imap%26ust%3D1687760804000000%26usg%3DAOvVaw2irnDxUAhXR0N8FUk2orze&source=gmail-imap&ust=1687773407000000&usg=AOvVaw0ItT553mEuA5KaeJWSH36D
>>> 
>>>> 
>>>> 
>>>> 
>>>>> On 15. Jun 2023, at 13:35, Lijie Wang <wangdachui9...@gmail.com 
>>>>> <mailto:wangdachui9...@gmail.com>
>> <mailto:wangdachui9...@gmail.com>> wrote:
>>>>> 
>>>>> Hi,  Benchao and Aitozi,
>>>>> 
>>>>> Thanks for your feedback about this FLIP.
>>>>> 
>>>>> @Benchao
>>>>> 
>>>>>>> I think it would be reasonable to also support "pipeline shuffle" if
>>>>> possible.
>>>>> As I said above, runtime filter can work well with all shuffle mode,
>>>>> including pipeline shuffle.
>>>>> 
>>>>>>> if the RuntimeFIlterBuilder could be done quickly than RuntimeFilter
>>>>> operator, it can still filter out additional data afterwards.
>>>>> I think the main purpose of runtime filter is to reduce the shuffle
>> data
>>>>> and the data arriving at join. Although eagerly running the large
>>>>> table side can process datas in advance, most of the data may be
>>>>> irrelevant, causing huge shuffle overhead and slowing the join. In
>>>>> addition, if the join is a hash-join, the probe side of the hash-join
>>>> also
>>>>> needs to wait for its build side to complete, so the large table side
>> is
>>>>> likely to be back-pressed.
>>>>> In addition, I don't tend to add too many configuration options in the
>>>>> first version, which may make it more difficult to use (users need to
>>>>> understand a lot of internal implementation details). Maybe it could
>> be a
>>>>> future improvement (if it's worthwhile)?
>>>>> 
>>>>> 
>>>>> @Aitozi
>>>>> 
>>>>>>> IMO, In the current implementation two source table operators will be
>>>>> executed simultaneously.
>>>>> The example in FLIP uses blocking shuffle(I will add this point to
>> FLIP).
>>>>> The runtime filter is generally chained with the large table side to
>>>> reduce
>>>>> the shuffle data (as shown in Figure 2 of FLIP). The job vertices
>> should
>>>> be
>>>>> scheduled in topological order, so the large table side can only be
>>>>> scheduled after the RuntimeFilterBuilder finishes.
>>>>> 
>>>>>>> Are there some tests to show the default value of
>>>>> table.optimizer.runtime-filter.min-probe-data-size 10G is a good
>> default
>>>>> value.
>>>>> It's not tested yet, but it will be done before merge the code. The
>>>> current
>>>>> value refers to systems such as spark and hive. Before code merging, we
>>>>> will test on TPC-DS 10 T to find an optimal set of values. If you have
>>>>> relevant experience on it, welcome to give some suggestions.
>>>>> 
>>>>>>> What's the representation of the runtime filter node in planner ?
>>>>> As shown in Figure 1 of FLIP, we intend to add two new physical nodes,
>>>>> RuntimeFilterBuilder and RuntimeFilter.
>>>>> 
>>>>> Best,
>>>>> Lijie
>>>>> 
>>>>> Aitozi <gjying1...@gmail.com <mailto:gjying1...@gmail.com> 
>>>>> <mailto:gjying1...@gmail.com> <mailto:
>> gjying1...@gmail.com <mailto:gjying1...@gmail.com>>>
>>>> 于2023年6月15日周四 15:52写道:
>>>>> 
>>>>>> Hi Lijie,
>>>>>> 
>>>>>>  Nice to see this valuable feature. After reading the FLIP I have
>> some
>>>>>> questions below:
>>>>>> 
>>>>>>> Schedule the TableSource(dim) first.
>>>>>> 
>>>>>> How does it know to schedule the TableSource(dim) first ? IMO, In the
>>>>>> current implementation two source table operators will be executed
>>>>>> simultaneously.
>>>>>> 
>>>>>>> If the data volume on the probe side is too small, the overhead of
>>>>>> building runtime filter is not worth it.
>>>>>> 
>>>>>> Are there some tests to show the default value of
>>>>>> table.optimizer.runtime-filter.min-probe-data-size 10G is a good
>> default
>>>>>> value. The same to table.optimizer.runtime-filter.max-build-data-size
>>>>>> 
>>>>>>> the runtime filter can be pushed down along the probe side, as close
>> to
>>>>>> data sources as possible
>>>>>> 
>>>>>> What's the representation of the runtime filter node in planner ? Is
>> it
>>>> a
>>>>>> Filternode
>>>>>> 
>>>>>> Best,
>>>>>> 
>>>>>> Aitozi.
>>>>>> 
>>>>>> Benchao Li <libenc...@apache.org <mailto:libenc...@apache.org> 
>>>>>> <mailto:libenc...@apache.org>>
>> 于2023年6月15日周四 14:30写道:
>>>>>> 
>>>>>>> Hi Lijie,
>>>>>>> 
>>>>>>> Regarding the shuffle mode, I think it would be reasonable to also
>>>>>> support
>>>>>>> "pipeline shuffle" if possible.
>>>>>>> 
>>>>>>> "pipeline shuffle" is a essential for OLAP/MPP computing, although
>> this
>>>>>> has
>>>>>>> not been much exposed to users for now, I know a few companies that
>>>> uses
>>>>>>> Flink as a MPP computing engine, and there is an ongoing effort[1] to
>>>>>> make
>>>>>>> this usage more powerful.
>>>>>>> 
>>>>>>> Back to your concern that "Even if the RuntimeFilter becomes running
>>>>>> before
>>>>>>> the RuntimeFilterBuilder finished, it will not process any data and
>>>> will
>>>>>>> occupy resources", whether it benefits us depends on the scale of
>> data,
>>>>>> if
>>>>>>> the RuntimeFIlterBuilder could be done quickly than RuntimeFilter
>>>>>> operator,
>>>>>>> it can still filter out additional data afterwards. Hence in my
>>>> opinion,
>>>>>> we
>>>>>>> do not need to make the edge between RuntimeFilterBuilder and
>>>>>> RuntimeFilter
>>>>>>> BLOCKING only, at least it can be configured.
>>>>>>> 
>>>>>>> [1]
>>>> 
>> https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://issues.apache.org/jira/browse/FLINK-25318%2526source%253Dgmail-imap%2526ust%253D1687433776000000%2526usg%253DAOvVaw3GqdpuiCqegqRLDv1PjMiL%26source%3Dgmail-imap%26ust%3D1687760804000000%26usg%3DAOvVaw1oNzOlNn0UCDtz1M9jAw1x&source=gmail-imap&ust=1687773407000000&usg=AOvVaw3Zt14Wvxs_b8ghD0dIgPfH
>>>>>>> 
>>>>>>> Lijie Wang <wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com> 
>>>>>>> <mailto:
>> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>> 
>> <mailto:wangdachui9...@gmail.com>>
>>>> 于2023年6月15日周四 14:18写道:
>>>>>>> 
>>>>>>>> Hi Yuxia,
>>>>>>>> 
>>>>>>>> I made a mistake in the above response.
>>>>>>>> 
>>>>>>>> The runtime filter can work well with all shuffle mode. However,
>>>> hybrid
>>>>>>>> shuffle and blocking shuffle are currently recommended for batch
>> jobs
>>>>>>>> (piepline shuffle is not recommended).
>>>>>>>> 
>>>>>>>> One more thing to mention here is that we will force the edge
>> between
>>>>>>>> RuntimeFilterBuilder and RuntimeFilter to be BLOCKING(regardless of
>>>>>> which
>>>>>>>> BatchShuffleMode is set). Because the RuntimeFilter really doesn’t
>>>> need
>>>>>>> to
>>>>>>>> run before the RuntimeFilterBuilder finished. Even if the
>>>> RuntimeFilter
>>>>>>>> becomes running before the RuntimeFilterBuilder finished, it will
>> not
>>>>>>>> process any data and will occupy resources.
>>>>>>>> 
>>>>>>>> Best,
>>>>>>>> Lijie
>>>>>>>> 
>>>>>>>> Lijie Wang <wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com> 
>>>>>>>> <mailto:
>> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>> 
>> <mailto:wangdachui9...@gmail.com>>
>>>> 于2023年6月15日周四 09:48写道:
>>>>>>>> 
>>>>>>>>> Hi Yuxia,
>>>>>>>>> 
>>>>>>>>> Thanks for your feedback. The answers of your questions are as
>>>>>> follows:
>>>>>>>>> 
>>>>>>>>> 1. Yes, the row count comes from statistic of underlying table(Or
>>>>>>>>> estimated based on the statistic of underlying table, if the build
>>>>>> side
>>>>>>>> or
>>>>>>>>> probe side is not TableScan).  If the statistic unavailable, we
>> will
>>>>>>> not
>>>>>>>>> inject a runtime filter(As you said, we can hardly evaluate the
>>>>>>>> benefits).
>>>>>>>>> Besides, AFAIK, the estimated data size of build side is also based
>>>>>> on
>>>>>>>> the
>>>>>>>>> row count statistics, that is, if the statistics is unavailable,
>> the
>>>>>>>>> requirement "table.optimizer.runtime-filter.max-build-data-size"
>>>>>> cannot
>>>>>>>> be
>>>>>>>>> evaluated either. I'll add this point into FLIP.
>>>>>>>>> 
>>>>>>>>> 2.
>>>>>>>>> Estimated data size does not meet requirement (in planner
>>>>>> optimization
>>>>>>>>> phase) -> No filter
>>>>>>>>> Estimated data size meets the requirement (in planner optimization
>>>>>>>> phase),
>>>>>>>>> but the real data size does not meet the requirement(in execution
>>>>>>> phase)
>>>>>>>> ->
>>>>>>>>> Fake filter
>>>>>>>>> 
>>>>>>>>> 3. Yes, the runtime filter is only for batch jobs/blocking shuffle.
>>>>>>>>> 
>>>>>>>>> Best,
>>>>>>>>> Lijie
>>>>>>>>> 
>>>>>>>>> yuxia <luoyu...@alumni.sjtu.edu.cn 
>>>>>>>>> <mailto:luoyu...@alumni.sjtu.edu.cn> <mailto:
>> luoyu...@alumni.sjtu.edu.cn <mailto:luoyu...@alumni.sjtu.edu.cn>> <mailto:
>>>> luoyu...@alumni.sjtu.edu.cn <mailto:luoyu...@alumni.sjtu.edu.cn> 
>>>> <mailto:luoyu...@alumni.sjtu.edu.cn>>>
>> 于2023年6月14日周三 20:37写道:
>>>>>>>>> 
>>>>>>>>>> Thanks Lijie for starting this discussion. Excited to see runtime
>>>>>>> filter
>>>>>>>>>> is to be implemented in Flink.
>>>>>>>>>> I have few questions about it:
>>>>>>>>>> 
>>>>>>>>>> 1: As the FLIP said, `if the ndv cannot be estimated, use row
>> count
>>>>>>>>>> instead`. So, does row count comes from the statistic from
>>>>>> underlying
>>>>>>>>>> table? What if the the statistic is also unavailable considering
>>>>>> users
>>>>>>>>>> maynot always remember to generate statistic in production.
>>>>>>>>>> I'm wondering whether it make senese that just disable runtime
>>>>>> filter
>>>>>>> if
>>>>>>>>>> statistic is unavailable since in that case, we can hardly
>> evaluate
>>>>>>> the
>>>>>>>>>> benefits of runtime-filter.
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 2: The FLIP said: "We will inject the runtime filters only if the
>>>>>>>>>> following requirements are met:xxx", but it also said, "Once this
>>>>>>> limit
>>>>>>>> is
>>>>>>>>>> exceeded, it will output a fake filter(which always returns true)"
>>>>>> in
>>>>>>>>>> `RuntimeFilterBuilderOperator` part; Seems they are contradictory,
>>>>>> so
>>>>>>>> i'm
>>>>>>>>>> wondering what's the real behavior, no filter will be injected or
>>>>>> fake
>>>>>>>>>> filter?
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 3: Does it also mean runtime-filter can only take effect in
>> blocking
>>>>>>>>>> shuffle?
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> Best regards,
>>>>>>>>>> Yuxia
>>>>>>>>>> 
>>>>>>>>>> ----- 原始邮件 -----
>>>>>>>>>> 发件人: "ron9 liu" <ron9....@gmail.com <mailto:ron9....@gmail.com> 
>>>>>>>>>> <mailto:ron9....@gmail.com>
>> <mailto:ron9....@gmail.com>>
>>>>>>>>>> 收件人: "dev" <dev@flink.apache.org <mailto:dev@flink.apache.org> 
>>>>>>>>>> <mailto:dev@flink.apache.org>
>> <mailto:dev@flink.apache.org>>
>>>>>>>>>> 发送时间: 星期三, 2023年 6 月 14日 下午 5:29:28
>>>>>>>>>> 主题: Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink
>> Batch
>>>>>>>> Jobs
>>>>>>>>>> 
>>>>>>>>>> Thanks Lijie start this discussion. Runtime Filter is a common
>>>>>>>>>> optimization
>>>>>>>>>> to improve the join performance that has been adopted by many
>>>>>>> computing
>>>>>>>>>> engines such as Spark, Doris, etc... Flink is a streaming batch
>>>>>>>> computing
>>>>>>>>>> engine, and we are continuously optimizing the performance of
>>>>>> batches.
>>>>>>>>>> Runtime filter is a general performance optimization technique
>> that
>>>>>>> can
>>>>>>>>>> improve the performance of Flink batch jobs, so we are introducing
>>>>>> it
>>>>>>> on
>>>>>>>>>> batch as well.
>>>>>>>>>> 
>>>>>>>>>> Looking forward to all feedback.
>>>>>>>>>> 
>>>>>>>>>> Best,
>>>>>>>>>> Ron
>>>>>>>>>> 
>>>>>>>>>> Lijie Wang <wangdachui9...@gmail.com 
>>>>>>>>>> <mailto:wangdachui9...@gmail.com> <mailto:
>> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>> <mailto:
>>>> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com> 
>>>> <mailto:wangdachui9...@gmail.com>>>
>> 于2023年6月14日周三 17:17写道:
>>>>>>>>>> 
>>>>>>>>>>> Hi devs
>>>>>>>>>>> 
>>>>>>>>>>> Ron Liu, Gen Luo and I would like to start a discussion about
>>>>>>>> FLIP-324:
>>>>>>>>>>> Introduce Runtime Filter for Flink Batch Jobs[1]
>>>>>>>>>>> 
>>>>>>>>>>> Runtime Filter is a common optimization to improve join
>>>>>> performance.
>>>>>>>> It
>>>>>>>>>> is
>>>>>>>>>>> designed to dynamically generate filter conditions for certain
>>>>>> Join
>>>>>>>>>> queries
>>>>>>>>>>> at runtime to reduce the amount of scanned or shuffled data,
>> avoid
>>>>>>>>>>> unnecessary I/O and network transmission, and speed up the query.
>>>>>>> Its
>>>>>>>>>>> working principle is building a filter(e.g. bloom filter) based
>> on
>>>>>>> the
>>>>>>>>>> data
>>>>>>>>>>> on the small table side(build side) first, then pass this filter
>>>>>> to
>>>>>>>> the
>>>>>>>>>>> large table side(probe side) to filter the irrelevant data on it,
>>>>>>> this
>>>>>>>>>> can
>>>>>>>>>>> reduce the data reaching the join and improve performance.
>>>>>>>>>>> 
>>>>>>>>>>> You can find more details in the FLIP-324[1]. Looking forward to
>>>>>>> your
>>>>>>>>>>> feedback.
>>>>>>>>>>> 
>>>>>>>>>>> [1]
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> 
>> https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://cwiki.apache.org/confluence/display/FLINK/FLIP-324%2525253A%25252BIntroduce%25252BRuntime%25252BFilter%25252Bfor%25252BFlink%25252BBatch%25252BJobs%2526source%253Dgmail-imap%2526ust%253D1687433776000000%2526usg%253DAOvVaw0ke1ZHcJ--A1QgsbB84MHA%26source%3Dgmail-imap%26ust%3D1687760804000000%26usg%3DAOvVaw21E3CQyayeBTYztmOnwMcz&source=gmail-imap&ust=1687773407000000&usg=AOvVaw0xVu0zYYNRmh8u8aq7uSi3
>>>>>>>>>>> 
>>>>>>>>>>> Best,
>>>>>>>>>>> Ron & Gen & Lijie
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> --
>>>>>>> 
>>>>>>> Best,
>>>>>>> Benchao Li

Reply via email to