Hi Dian, I still think your implementation is similar to the window operator, you mentioned the scalable trigger mechanism, the window API also can customize trigger.
Moreover, IMO, the design should guarantee a deterministic semantics, I think based on memory availability is a non-deterministic design. In addition, if the implementation depends on the timing of checkpoint, I do not think it is reasonable, we should avoid affecting the checkpoint's progress. Best, Vino Dian Fu <dian0511...@gmail.com> 于2019年6月5日周三 下午1:55写道: > Hi Vino, > > Thanks a lot for your reply. > > > 1) When, Why and How to judge the memory is exhausted? > > My point here is that the local aggregate operator can buffer the inputs > in memory and send out the results AT ANY TIME. i.e. element count or the > time interval reached a pre-configured value, the memory usage of buffered > elements reached a configured valued (suppose we can estimate the object > size efficiently), or even when checkpoint is triggered. > > > > > 2) If the local aggregate operator rarely needs to operate the state, > what > > do you think about fault tolerance? > > AbstractStreamOperator provides a method `prepareSnapshotPreBarrier` which > can be used here to send out the results to the downstream when checkpoint > is triggered. Then fault tolerance can work well. > > Even if there is no such a method available, we can still store the > buffered elements or pre-aggregate results to state when checkpoint is > triggered. The state access will be much less compared with window operator > as only the elements not sent out when checkpoint occur have to be written > to state. Suppose the checkpoint interval is 3 minutes and the trigger > interval is 10 seconds, then only about less than "10/180" elements will be > written to state. > > > Thanks, > Dian > > > > 在 2019年6月5日,上午11:43,Biao Liu <mmyy1...@gmail.com> 写道: > > > > Hi Vino, > > > > +1 for this feature. It's useful for data skew. And it could also reduce > > shuffled datum. > > > > I have some concerns about the API part. From my side, this feature > should > > be more like an improvement. I'm afraid the proposal is an overkill about > > the API part. Many other systems support pre-aggregation as an > optimization > > of global aggregation. The optimization might be used automatically or > > manually but with a simple API. The proposal introduces a series of > > flexible local aggregation APIs. They could be independent with global > > aggregation. It doesn't look like an improvement but introduces a lot of > > features. I'm not sure if there is a bigger picture later. As for now the > > API part looks a little heavy for me. > > > > > > vino yang <yanghua1...@gmail.com> 于2019年6月5日周三 上午10:38写道: > > > >> Hi Litree, > >> > >> From an implementation level, the localKeyBy API returns a general > >> KeyedStream, you can call all the APIs which KeyedStream provides, we > did > >> not restrict its usage, although we can do this (for example returns a > new > >> stream object named LocalKeyedStream). > >> > >> However, to achieve the goal of local aggregation, it only makes sense > to > >> call the window API. > >> > >> Best, > >> Vino > >> > >> litree <lyuan...@126.com> 于2019年6月4日周二 下午10:41写道: > >> > >>> Hi Vino, > >>> > >>> > >>> I have read your design,something I want to know is the usage of these > >> new > >>> APIs.It looks like when I use localByKey,i must then use a window > >> operator > >>> to return a datastream,and then use keyby and another window operator > to > >>> get the final result? > >>> > >>> > >>> thanks, > >>> Litree > >>> > >>> > >>> On 06/04/2019 17:22, vino yang wrote: > >>> Hi Dian, > >>> > >>> Thanks for your reply. > >>> > >>> I know what you mean. However, if you think deeply, you will find your > >>> implementation need to provide an operator which looks like a window > >>> operator. You need to use state and receive aggregation function and > >>> specify the trigger time. It looks like a lightweight window operator. > >>> Right? > >>> > >>> We try to reuse Flink provided functions and reduce complexity. IMO, It > >> is > >>> more user-friendly because users are familiar with the window API. > >>> > >>> Best, > >>> Vino > >>> > >>> > >>> Dian Fu <dian0511...@gmail.com> 于2019年6月4日周二 下午4:19写道: > >>> > >>>> Hi Vino, > >>>> > >>>> Thanks a lot for starting this discussion. +1 to this feature as I > >> think > >>>> it will be very useful. > >>>> > >>>> Regarding to using window to buffer the input elements, personally I > >>> don't > >>>> think it's a good solution for the following reasons: > >>>> 1) As we know that WindowOperator will store the accumulated results > in > >>>> states, this is not necessary for Local Aggregate operator. > >>>> 2) For WindowOperator, each input element will be accumulated to > >> states. > >>>> This is also not necessary for Local Aggregate operator and storing > the > >>>> input elements in memory is enough. > >>>> > >>>> Thanks, > >>>> Dian > >>>> > >>>>> 在 2019年6月4日,上午10:03,vino yang <yanghua1...@gmail.com> 写道: > >>>>> > >>>>> Hi Ken, > >>>>> > >>>>> Thanks for your reply. > >>>>> > >>>>> As I said before, we try to reuse Flink's state concept (fault > >>> tolerance > >>>>> and guarantee "Exactly-Once" semantics). So we did not consider > >> cache. > >>>>> > >>>>> In addition, if we use Flink's state, the OOM related issue is not a > >>> key > >>>>> problem we need to consider. > >>>>> > >>>>> Best, > >>>>> Vino > >>>>> > >>>>> Ken Krugler <kkrugler_li...@transpac.com> 于2019年6月4日周二 上午1:37写道: > >>>>> > >>>>>> Hi all, > >>>>>> > >>>>>> Cascading implemented this “map-side reduce” functionality with an > >> LLR > >>>>>> cache. > >>>>>> > >>>>>> That worked well, as then the skewed keys would always be in the > >>> cache. > >>>>>> > >>>>>> The API let you decide the size of the cache, in terms of number of > >>>>>> entries. > >>>>>> > >>>>>> Having a memory limit would have been better for many of our use > >>> cases, > >>>>>> though FWIR there’s no good way to estimate in-memory size for > >>> objects. > >>>>>> > >>>>>> — Ken > >>>>>> > >>>>>>> On Jun 3, 2019, at 2:03 AM, vino yang <yanghua1...@gmail.com> > >> wrote: > >>>>>>> > >>>>>>> Hi Piotr, > >>>>>>> > >>>>>>> The localKeyBy API returns an instance of KeyedStream (we just > >> added > >>> an > >>>>>>> inner flag to identify the local mode) which is Flink has provided > >>>>>> before. > >>>>>>> Users can call all the APIs(especially *window* APIs) which > >>> KeyedStream > >>>>>>> provided. > >>>>>>> > >>>>>>> So if users want to use local aggregation, they should call the > >>> window > >>>>>> API > >>>>>>> to build a local window that means users should (or say "can") > >>> specify > >>>>>> the > >>>>>>> window length and other information based on their needs. > >>>>>>> > >>>>>>> I think you described another idea different from us. We did not > >> try > >>> to > >>>>>>> react after triggering some predefined threshold. We tend to give > >>> users > >>>>>> the > >>>>>>> discretion to make decisions. > >>>>>>> > >>>>>>> Our design idea tends to reuse Flink provided concept and functions > >>>> like > >>>>>>> state and window (IMO, we do not need to worry about OOM and the > >>> issues > >>>>>> you > >>>>>>> mentioned). > >>>>>>> > >>>>>>> Best, > >>>>>>> Vino > >>>>>>> > >>>>>>> Piotr Nowojski <pi...@ververica.com> 于2019年6月3日周一 下午4:30写道: > >>>>>>> > >>>>>>>> Hi, > >>>>>>>> > >>>>>>>> +1 for the idea from my side. I’ve even attempted to add similar > >>>> feature > >>>>>>>> quite some time ago, but didn’t get enough traction [1]. > >>>>>>>> > >>>>>>>> I’ve read through your document and I couldn’t find it mentioning > >>>>>>>> anywhere, when the pre aggregated result should be emitted down > >> the > >>>>>> stream? > >>>>>>>> I think that’s one of the most crucial decision, since wrong > >>> decision > >>>>>> here > >>>>>>>> can lead to decrease of performance or to an explosion of > >>> memory/state > >>>>>>>> consumption (both with bounded and unbounded data streams). For > >>>>>> streaming > >>>>>>>> it can also lead to an increased latency. > >>>>>>>> > >>>>>>>> Since this is also a decision that’s impossible to make > >>> automatically > >>>>>>>> perfectly reliably, first and foremost I would expect this to be > >>>>>>>> configurable via the API. With maybe some predefined triggers, > >> like > >>> on > >>>>>>>> watermark (for windowed operations), on checkpoint barrier (to > >>>> decrease > >>>>>>>> state size?), on element count, maybe memory usage (much easier to > >>>>>> estimate > >>>>>>>> with a known/predefined types, like in SQL)… and with some option > >> to > >>>>>>>> implement custom trigger. > >>>>>>>> > >>>>>>>> Also what would work the best would be to have a some form of > >> memory > >>>>>>>> consumption priority. For example if we are running out of memory > >>> for > >>>>>>>> HashJoin/Final aggregation, instead of spilling to disk or > >> crashing > >>>> the > >>>>>> job > >>>>>>>> with OOM it would be probably better to prune/dump the pre/local > >>>>>>>> aggregation state. But that’s another story. > >>>>>>>> > >>>>>>>> [1] https://github.com/apache/flink/pull/4626 < > >>>>>>>> https://github.com/apache/flink/pull/4626> > >>>>>>>> > >>>>>>>> Piotrek > >>>>>>>> > >>>>>>>>> On 3 Jun 2019, at 10:16, sf lee <leesf0...@gmail.com> wrote: > >>>>>>>>> > >>>>>>>>> Excited and Big +1 for this feature. > >>>>>>>>> > >>>>>>>>> SHI Xiaogang <shixiaoga...@gmail.com> 于2019年6月3日周一 下午3:37写道: > >>>>>>>>> > >>>>>>>>>> Nice feature. > >>>>>>>>>> Looking forward to having it in Flink. > >>>>>>>>>> > >>>>>>>>>> Regards, > >>>>>>>>>> Xiaogang > >>>>>>>>>> > >>>>>>>>>> vino yang <yanghua1...@gmail.com> 于2019年6月3日周一 下午3:31写道: > >>>>>>>>>> > >>>>>>>>>>> Hi all, > >>>>>>>>>>> > >>>>>>>>>>> As we mentioned in some conference, such as Flink Forward SF > >> 2019 > >>>> and > >>>>>>>>>> QCon > >>>>>>>>>>> Beijing 2019, our team has implemented "Local aggregation" in > >> our > >>>>>> inner > >>>>>>>>>>> Flink fork. This feature can effectively alleviate data skew. > >>>>>>>>>>> > >>>>>>>>>>> Currently, keyed streams are widely used to perform aggregating > >>>>>>>>>> operations > >>>>>>>>>>> (e.g., reduce, sum and window) on the elements that having the > >>> same > >>>>>>>> key. > >>>>>>>>>>> When executed at runtime, the elements with the same key will > >> be > >>>> sent > >>>>>>>> to > >>>>>>>>>>> and aggregated by the same task. > >>>>>>>>>>> > >>>>>>>>>>> The performance of these aggregating operations is very > >> sensitive > >>>> to > >>>>>>>> the > >>>>>>>>>>> distribution of keys. In the cases where the distribution of > >> keys > >>>>>>>>>> follows a > >>>>>>>>>>> powerful law, the performance will be significantly downgraded. > >>>> More > >>>>>>>>>>> unluckily, increasing the degree of parallelism does not help > >>> when > >>>> a > >>>>>>>> task > >>>>>>>>>>> is overloaded by a single key. > >>>>>>>>>>> > >>>>>>>>>>> Local aggregation is a widely-adopted method to reduce the > >>>>>> performance > >>>>>>>>>>> degraded by data skew. We can decompose the aggregating > >>> operations > >>>>>> into > >>>>>>>>>> two > >>>>>>>>>>> phases. In the first phase, we aggregate the elements of the > >> same > >>>> key > >>>>>>>> at > >>>>>>>>>>> the sender side to obtain partial results. Then at the second > >>>> phase, > >>>>>>>>>> these > >>>>>>>>>>> partial results are sent to receivers according to their keys > >> and > >>>> are > >>>>>>>>>>> combined to obtain the final result. Since the number of > >> partial > >>>>>>>> results > >>>>>>>>>>> received by each receiver is limited by the number of senders, > >>> the > >>>>>>>>>>> imbalance among receivers can be reduced. Besides, by reducing > >>> the > >>>>>>>> amount > >>>>>>>>>>> of transferred data the performance can be further improved. > >>>>>>>>>>> > >>>>>>>>>>> The design documentation is here: > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>> > >>>>>> > >>>> > >>> > >> > https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing > >>>>>>>>>>> > >>>>>>>>>>> Any comment and feedback are welcome and appreciated. > >>>>>>>>>>> > >>>>>>>>>>> Best, > >>>>>>>>>>> Vino > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>> > >>>>>> -------------------------- > >>>>>> Ken Krugler > >>>>>> +1 530-210-6378 > >>>>>> http://www.scaleunlimited.com > >>>>>> Custom big data solutions & training > >>>>>> Flink, Solr, Hadoop, Cascading & Cassandra > >>>>>> > >>>>>> > >>>> > >>>> > >>> > >> > >