Hi, In general, I like the proposal as well. We should try to integrate all forms of keyed state with the backend, to avoid the problems that we are currently facing with the timer service. We should discuss which exact implementation of bloom filters are the best fit.
@Fabian: There are also implementations of bloom filters that use counting and therefore support deletes, but obviously this comes at the cost of a potentially higher space consumption. Best, Stefan > Am 23.05.2018 um 11:29 schrieb Fabian Hueske <fhue...@gmail.com>: > > Thanks for the proposal Sihua! > > Let me try to summarize the motivation / scope of this proposal. > > You are proposing to add support for a special Bloom Filter state per > KeyGroup and reduce the number of key accesses by checking the Bloom Filter > first. > > This is would be a rather generic feature that could be interesting for > various applications, including joins and deduplication as you described. > > IMO, such a feature would be very interesting. However, my concerns with > Bloom Filter is that they are insert-only data structures, i.e., it is not > possible to remove keys once they were added. This might render the filter > useless over time. > In a different thread (see discussion in FLINK-8918 [1]), you mentioned that > the Bloom Filters would be growing. > If we keep them in memory, how can we prevent them from exceeding memory > boundaries over time? > > Best, > Fabian > > [1] https://issues.apache.org/jira/browse/FLINK-8918 > <https://issues.apache.org/jira/browse/FLINK-8918> > > 2018-05-23 9:56 GMT+02:00 sihua zhou <summerle...@163.com > <mailto:summerle...@163.com>>: > Hi Devs! > I proposal to introduce "Elastic Bloom Filter" for Flink, the reason I make > up this proposal is that, it helped us a lot on production, it let's improve > the performance with reducing consumption of resources. Here is a brief > description fo the motivation of why it's so powful, more detail information > can be found https://issues.apache.org/jira/browse/FLINK-8601 > <https://issues.apache.org/jira/browse/FLINK-8601> , and the design doc can > be found > https://docs.google.com/document/d/17UY5RZ1mq--hPzFx-LfBjCAw_kkoIrI9KHovXWkxNYY/edit?usp=sharing > > <https://docs.google.com/document/d/17UY5RZ1mq--hPzFx-LfBjCAw_kkoIrI9KHovXWkxNYY/edit?usp=sharing> > > ------------------------------------ > Motivation > > There are some scenarios drive us to introduce this ElasticBloomFilter, one > is Stream Join, another is Data Deduplication, and some special user > cases...This has given us a great experience, for example, we implemented > the Runtime Filter Join base on it, and it gives us a great performance > improvement. With this feature, It diffs us from the "normal stream join", > allows us to improve performance while reducing resource consumption by about > half!!! > I will list the two most typical user cases that optimized by the > ElasticBloomFilter: one is "Runtime Filter Join" in detail, another is "Data > Dedeplication" in brief. > > Scenario 1: Runtime Filter Join > > In general, stream join is one of the most performance cost task. For every > record from both side, we need to query the state from the other side, this > will lead to poor performance when the state size if huge. So, in production, > we always need to spend a lot slots to handle stream join. But, indeed, we > can improve this in somehow, there a phenomenon of stream join can be found > in production. That's the “joined ratio” of the stream join is often very > low, for example. > stream join in promotion analysis: Job need to join the promotion log with > the action(click, view, buy) log with the promotion_id to analysis the effect > of the promotion. > stream join in AD(advertising) attribution: Job need to join the AD click log > with the item payment log on the click_id to find which click of which AD > that brings the payment to do attribution. > stream join in click log analysis of doc: Job need to join viewed log(doc > viewed by users) with the click log (doc clicked by users) to analysis the > reason of the click and the property of the users. > ….so on > All these cases have one common property, that is the joined ratio is very > low. Here is a example to describe it, we have 10000 records from the left > stream, and 10000 records from the right stream, and we execute select * > from leftStream l join rightStream r on l.id <http://l.id/> = r.id > <http://r.id/> , we only got 100 record from the result, that is the case for > low joined ratio, this is an example for inner join, but it can also applied > to left & right join. > > there are more example I can come up with low joined ratio…but the point I > want to raise up is that the low joined ratio of stream join in production is > a very common phenomenon(maybe even the almost common phenomenon in some > companies, at least in our company that is the case). > > How to improve this? > > We can see from the above case, 10000 record join 10000 record and we only > got 100 result, that means, we query the state 20000 times (10000 for the > left stream and 10000 for the right stream) but only 100 of them are > meaningful!!! If we could reduce the useless query times, then we can > definitely improve the performance of stream join. > the way we used to improve this is to introduce the Runtime Filter Join, the > mainly ideal is that, we build a filter for the state on each side (left > stream & right stream). When we need to query the state on that side we first > check the corresponding filter whether the key is possible in the state, if > the filter say "not, it impossible in the State", then we stop querying the > state, if it say "hmm, it maybe in state", then we need to query the state. > As you can see, the best choose of the filter is Bloom Filter, it has all the > feature that we want: extremely good performance, non-existence of false > negative. > > Scenario 2: Data Deduplication > > We have implemented two general functions based on the ElasticBloomFilter. > They are count(distinct x) and select distinct x, y, z from table. Unlike the > Runtime Filter Join the result of this two functions is approximate, not > exactly. There are used in the scenario where we don't need a 100% accurate > result, for example, to count the number of visiting users in each online > store. In general, we don't need a 100% accurate result in this case(indeed > we can't give a 100% accurate result, because there could be error when > collecting user_id from different devices), if we could get a 98% accurate > result with only 1/2 resource, that could be very nice. > > I believe there would be more user cases in stream world that could be > optimized by the Bloom Filter(as what it had done in the big data world)... > > I will appreciate it very much, if someone could have a look of the JIRA or > the google doc and give some comments! > > Thanks, Sihua > >