hi +1,nice work
best forwardxu boshu Zheng <kisim...@163.com> 于2019年6月6日周四 下午4:30写道: > Hi, > > > +1 from my side. Looking forward to this must-have feature :) > > > Best, > boshu > > At 2019-06-05 16:33:13, "vino yang" <yanghua1...@gmail.com> wrote: > >Hi Aljoscha, > > > >What do you think about this feature and design document? > > > >Best, > >Vino > > > >vino yang <yanghua1...@gmail.com> 于2019年6月5日周三 下午4:18写道: > > > >> Hi Dian, > >> > >> I still think your implementation is similar to the window operator, you > >> mentioned the scalable trigger mechanism, the window API also can > customize > >> trigger. > >> > >> Moreover, IMO, the design should guarantee a deterministic semantics, I > >> think based on memory availability is a non-deterministic design. > >> > >> In addition, if the implementation depends on the timing of checkpoint, > I > >> do not think it is reasonable, we should avoid affecting the > checkpoint's > >> progress. > >> > >> Best, > >> Vino > >> > >> Dian Fu <dian0511...@gmail.com> 于2019年6月5日周三 下午1:55写道: > >> > >>> Hi Vino, > >>> > >>> Thanks a lot for your reply. > >>> > >>> > 1) When, Why and How to judge the memory is exhausted? > >>> > >>> My point here is that the local aggregate operator can buffer the > inputs > >>> in memory and send out the results AT ANY TIME. i.e. element count or > the > >>> time interval reached a pre-configured value, the memory usage of > buffered > >>> elements reached a configured valued (suppose we can estimate the > object > >>> size efficiently), or even when checkpoint is triggered. > >>> > >>> > > >>> > 2) If the local aggregate operator rarely needs to operate the state, > >>> what > >>> > do you think about fault tolerance? > >>> > >>> AbstractStreamOperator provides a method `prepareSnapshotPreBarrier` > >>> which can be used here to send out the results to the downstream when > >>> checkpoint is triggered. Then fault tolerance can work well. > >>> > >>> Even if there is no such a method available, we can still store the > >>> buffered elements or pre-aggregate results to state when checkpoint is > >>> triggered. The state access will be much less compared with window > operator > >>> as only the elements not sent out when checkpoint occur have to be > written > >>> to state. Suppose the checkpoint interval is 3 minutes and the trigger > >>> interval is 10 seconds, then only about less than "10/180" elements > will be > >>> written to state. > >>> > >>> > >>> Thanks, > >>> Dian > >>> > >>> > >>> > 在 2019年6月5日,上午11:43,Biao Liu <mmyy1...@gmail.com> 写道: > >>> > > >>> > Hi Vino, > >>> > > >>> > +1 for this feature. It's useful for data skew. And it could also > reduce > >>> > shuffled datum. > >>> > > >>> > I have some concerns about the API part. From my side, this feature > >>> should > >>> > be more like an improvement. I'm afraid the proposal is an overkill > >>> about > >>> > the API part. Many other systems support pre-aggregation as an > >>> optimization > >>> > of global aggregation. The optimization might be used automatically > or > >>> > manually but with a simple API. The proposal introduces a series of > >>> > flexible local aggregation APIs. They could be independent with > global > >>> > aggregation. It doesn't look like an improvement but introduces a > lot of > >>> > features. I'm not sure if there is a bigger picture later. As for now > >>> the > >>> > API part looks a little heavy for me. > >>> > > >>> > > >>> > vino yang <yanghua1...@gmail.com> 于2019年6月5日周三 上午10:38写道: > >>> > > >>> >> Hi Litree, > >>> >> > >>> >> From an implementation level, the localKeyBy API returns a general > >>> >> KeyedStream, you can call all the APIs which KeyedStream provides, > we > >>> did > >>> >> not restrict its usage, although we can do this (for example > returns a > >>> new > >>> >> stream object named LocalKeyedStream). > >>> >> > >>> >> However, to achieve the goal of local aggregation, it only makes > sense > >>> to > >>> >> call the window API. > >>> >> > >>> >> Best, > >>> >> Vino > >>> >> > >>> >> litree <lyuan...@126.com> 于2019年6月4日周二 下午10:41写道: > >>> >> > >>> >>> Hi Vino, > >>> >>> > >>> >>> > >>> >>> I have read your design,something I want to know is the usage of > these > >>> >> new > >>> >>> APIs.It looks like when I use localByKey,i must then use a window > >>> >> operator > >>> >>> to return a datastream,and then use keyby and another window > operator > >>> to > >>> >>> get the final result? > >>> >>> > >>> >>> > >>> >>> thanks, > >>> >>> Litree > >>> >>> > >>> >>> > >>> >>> On 06/04/2019 17:22, vino yang wrote: > >>> >>> Hi Dian, > >>> >>> > >>> >>> Thanks for your reply. > >>> >>> > >>> >>> I know what you mean. However, if you think deeply, you will find > your > >>> >>> implementation need to provide an operator which looks like a > window > >>> >>> operator. You need to use state and receive aggregation function > and > >>> >>> specify the trigger time. It looks like a lightweight window > operator. > >>> >>> Right? > >>> >>> > >>> >>> We try to reuse Flink provided functions and reduce complexity. > IMO, > >>> It > >>> >> is > >>> >>> more user-friendly because users are familiar with the window API. > >>> >>> > >>> >>> Best, > >>> >>> Vino > >>> >>> > >>> >>> > >>> >>> Dian Fu <dian0511...@gmail.com> 于2019年6月4日周二 下午4:19写道: > >>> >>> > >>> >>>> Hi Vino, > >>> >>>> > >>> >>>> Thanks a lot for starting this discussion. +1 to this feature as I > >>> >> think > >>> >>>> it will be very useful. > >>> >>>> > >>> >>>> Regarding to using window to buffer the input elements, > personally I > >>> >>> don't > >>> >>>> think it's a good solution for the following reasons: > >>> >>>> 1) As we know that WindowOperator will store the accumulated > results > >>> in > >>> >>>> states, this is not necessary for Local Aggregate operator. > >>> >>>> 2) For WindowOperator, each input element will be accumulated to > >>> >> states. > >>> >>>> This is also not necessary for Local Aggregate operator and > storing > >>> the > >>> >>>> input elements in memory is enough. > >>> >>>> > >>> >>>> Thanks, > >>> >>>> Dian > >>> >>>> > >>> >>>>> 在 2019年6月4日,上午10:03,vino yang <yanghua1...@gmail.com> 写道: > >>> >>>>> > >>> >>>>> Hi Ken, > >>> >>>>> > >>> >>>>> Thanks for your reply. > >>> >>>>> > >>> >>>>> As I said before, we try to reuse Flink's state concept (fault > >>> >>> tolerance > >>> >>>>> and guarantee "Exactly-Once" semantics). So we did not consider > >>> >> cache. > >>> >>>>> > >>> >>>>> In addition, if we use Flink's state, the OOM related issue is > not a > >>> >>> key > >>> >>>>> problem we need to consider. > >>> >>>>> > >>> >>>>> Best, > >>> >>>>> Vino > >>> >>>>> > >>> >>>>> Ken Krugler <kkrugler_li...@transpac.com> 于2019年6月4日周二 上午1:37写道: > >>> >>>>> > >>> >>>>>> Hi all, > >>> >>>>>> > >>> >>>>>> Cascading implemented this “map-side reduce” functionality with > an > >>> >> LLR > >>> >>>>>> cache. > >>> >>>>>> > >>> >>>>>> That worked well, as then the skewed keys would always be in the > >>> >>> cache. > >>> >>>>>> > >>> >>>>>> The API let you decide the size of the cache, in terms of > number of > >>> >>>>>> entries. > >>> >>>>>> > >>> >>>>>> Having a memory limit would have been better for many of our use > >>> >>> cases, > >>> >>>>>> though FWIR there’s no good way to estimate in-memory size for > >>> >>> objects. > >>> >>>>>> > >>> >>>>>> — Ken > >>> >>>>>> > >>> >>>>>>> On Jun 3, 2019, at 2:03 AM, vino yang <yanghua1...@gmail.com> > >>> >> wrote: > >>> >>>>>>> > >>> >>>>>>> Hi Piotr, > >>> >>>>>>> > >>> >>>>>>> The localKeyBy API returns an instance of KeyedStream (we just > >>> >> added > >>> >>> an > >>> >>>>>>> inner flag to identify the local mode) which is Flink has > provided > >>> >>>>>> before. > >>> >>>>>>> Users can call all the APIs(especially *window* APIs) which > >>> >>> KeyedStream > >>> >>>>>>> provided. > >>> >>>>>>> > >>> >>>>>>> So if users want to use local aggregation, they should call the > >>> >>> window > >>> >>>>>> API > >>> >>>>>>> to build a local window that means users should (or say "can") > >>> >>> specify > >>> >>>>>> the > >>> >>>>>>> window length and other information based on their needs. > >>> >>>>>>> > >>> >>>>>>> I think you described another idea different from us. We did > not > >>> >> try > >>> >>> to > >>> >>>>>>> react after triggering some predefined threshold. We tend to > give > >>> >>> users > >>> >>>>>> the > >>> >>>>>>> discretion to make decisions. > >>> >>>>>>> > >>> >>>>>>> Our design idea tends to reuse Flink provided concept and > >>> functions > >>> >>>> like > >>> >>>>>>> state and window (IMO, we do not need to worry about OOM and > the > >>> >>> issues > >>> >>>>>> you > >>> >>>>>>> mentioned). > >>> >>>>>>> > >>> >>>>>>> Best, > >>> >>>>>>> Vino > >>> >>>>>>> > >>> >>>>>>> Piotr Nowojski <pi...@ververica.com> 于2019年6月3日周一 下午4:30写道: > >>> >>>>>>> > >>> >>>>>>>> Hi, > >>> >>>>>>>> > >>> >>>>>>>> +1 for the idea from my side. I’ve even attempted to add > similar > >>> >>>> feature > >>> >>>>>>>> quite some time ago, but didn’t get enough traction [1]. > >>> >>>>>>>> > >>> >>>>>>>> I’ve read through your document and I couldn’t find it > mentioning > >>> >>>>>>>> anywhere, when the pre aggregated result should be emitted > down > >>> >> the > >>> >>>>>> stream? > >>> >>>>>>>> I think that’s one of the most crucial decision, since wrong > >>> >>> decision > >>> >>>>>> here > >>> >>>>>>>> can lead to decrease of performance or to an explosion of > >>> >>> memory/state > >>> >>>>>>>> consumption (both with bounded and unbounded data streams). > For > >>> >>>>>> streaming > >>> >>>>>>>> it can also lead to an increased latency. > >>> >>>>>>>> > >>> >>>>>>>> Since this is also a decision that’s impossible to make > >>> >>> automatically > >>> >>>>>>>> perfectly reliably, first and foremost I would expect this to > be > >>> >>>>>>>> configurable via the API. With maybe some predefined triggers, > >>> >> like > >>> >>> on > >>> >>>>>>>> watermark (for windowed operations), on checkpoint barrier (to > >>> >>>> decrease > >>> >>>>>>>> state size?), on element count, maybe memory usage (much > easier > >>> to > >>> >>>>>> estimate > >>> >>>>>>>> with a known/predefined types, like in SQL)… and with some > option > >>> >> to > >>> >>>>>>>> implement custom trigger. > >>> >>>>>>>> > >>> >>>>>>>> Also what would work the best would be to have a some form of > >>> >> memory > >>> >>>>>>>> consumption priority. For example if we are running out of > memory > >>> >>> for > >>> >>>>>>>> HashJoin/Final aggregation, instead of spilling to disk or > >>> >> crashing > >>> >>>> the > >>> >>>>>> job > >>> >>>>>>>> with OOM it would be probably better to prune/dump the > pre/local > >>> >>>>>>>> aggregation state. But that’s another story. > >>> >>>>>>>> > >>> >>>>>>>> [1] https://github.com/apache/flink/pull/4626 < > >>> >>>>>>>> https://github.com/apache/flink/pull/4626> > >>> >>>>>>>> > >>> >>>>>>>> Piotrek > >>> >>>>>>>> > >>> >>>>>>>>> On 3 Jun 2019, at 10:16, sf lee <leesf0...@gmail.com> wrote: > >>> >>>>>>>>> > >>> >>>>>>>>> Excited and Big +1 for this feature. > >>> >>>>>>>>> > >>> >>>>>>>>> SHI Xiaogang <shixiaoga...@gmail.com> 于2019年6月3日周一 下午3:37写道: > >>> >>>>>>>>> > >>> >>>>>>>>>> Nice feature. > >>> >>>>>>>>>> Looking forward to having it in Flink. > >>> >>>>>>>>>> > >>> >>>>>>>>>> Regards, > >>> >>>>>>>>>> Xiaogang > >>> >>>>>>>>>> > >>> >>>>>>>>>> vino yang <yanghua1...@gmail.com> 于2019年6月3日周一 下午3:31写道: > >>> >>>>>>>>>> > >>> >>>>>>>>>>> Hi all, > >>> >>>>>>>>>>> > >>> >>>>>>>>>>> As we mentioned in some conference, such as Flink Forward > SF > >>> >> 2019 > >>> >>>> and > >>> >>>>>>>>>> QCon > >>> >>>>>>>>>>> Beijing 2019, our team has implemented "Local aggregation" > in > >>> >> our > >>> >>>>>> inner > >>> >>>>>>>>>>> Flink fork. This feature can effectively alleviate data > skew. > >>> >>>>>>>>>>> > >>> >>>>>>>>>>> Currently, keyed streams are widely used to perform > >>> aggregating > >>> >>>>>>>>>> operations > >>> >>>>>>>>>>> (e.g., reduce, sum and window) on the elements that having > the > >>> >>> same > >>> >>>>>>>> key. > >>> >>>>>>>>>>> When executed at runtime, the elements with the same key > will > >>> >> be > >>> >>>> sent > >>> >>>>>>>> to > >>> >>>>>>>>>>> and aggregated by the same task. > >>> >>>>>>>>>>> > >>> >>>>>>>>>>> The performance of these aggregating operations is very > >>> >> sensitive > >>> >>>> to > >>> >>>>>>>> the > >>> >>>>>>>>>>> distribution of keys. In the cases where the distribution > of > >>> >> keys > >>> >>>>>>>>>> follows a > >>> >>>>>>>>>>> powerful law, the performance will be significantly > >>> downgraded. > >>> >>>> More > >>> >>>>>>>>>>> unluckily, increasing the degree of parallelism does not > help > >>> >>> when > >>> >>>> a > >>> >>>>>>>> task > >>> >>>>>>>>>>> is overloaded by a single key. > >>> >>>>>>>>>>> > >>> >>>>>>>>>>> Local aggregation is a widely-adopted method to reduce the > >>> >>>>>> performance > >>> >>>>>>>>>>> degraded by data skew. We can decompose the aggregating > >>> >>> operations > >>> >>>>>> into > >>> >>>>>>>>>> two > >>> >>>>>>>>>>> phases. In the first phase, we aggregate the elements of > the > >>> >> same > >>> >>>> key > >>> >>>>>>>> at > >>> >>>>>>>>>>> the sender side to obtain partial results. Then at the > second > >>> >>>> phase, > >>> >>>>>>>>>> these > >>> >>>>>>>>>>> partial results are sent to receivers according to their > keys > >>> >> and > >>> >>>> are > >>> >>>>>>>>>>> combined to obtain the final result. Since the number of > >>> >> partial > >>> >>>>>>>> results > >>> >>>>>>>>>>> received by each receiver is limited by the number of > senders, > >>> >>> the > >>> >>>>>>>>>>> imbalance among receivers can be reduced. Besides, by > reducing > >>> >>> the > >>> >>>>>>>> amount > >>> >>>>>>>>>>> of transferred data the performance can be further > improved. > >>> >>>>>>>>>>> > >>> >>>>>>>>>>> The design documentation is here: > >>> >>>>>>>>>>> > >>> >>>>>>>>>>> > >>> >>>>>>>>>> > >>> >>>>>>>> > >>> >>>>>> > >>> >>>> > >>> >>> > >>> >> > >>> > https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing > >>> >>>>>>>>>>> > >>> >>>>>>>>>>> Any comment and feedback are welcome and appreciated. > >>> >>>>>>>>>>> > >>> >>>>>>>>>>> Best, > >>> >>>>>>>>>>> Vino > >>> >>>>>>>>>>> > >>> >>>>>>>>>> > >>> >>>>>>>> > >>> >>>>>>>> > >>> >>>>>> > >>> >>>>>> -------------------------- > >>> >>>>>> Ken Krugler > >>> >>>>>> +1 530-210-6378 > >>> >>>>>> http://www.scaleunlimited.com > >>> >>>>>> Custom big data solutions & training > >>> >>>>>> Flink, Solr, Hadoop, Cascading & Cassandra > >>> >>>>>> > >>> >>>>>> > >>> >>>> > >>> >>>> > >>> >>> > >>> >> > >>> > >>> >