Hi Aljoscha, What do you think about this feature and design document?
Best, Vino vino yang <yanghua1...@gmail.com> 于2019年6月5日周三 下午4:18写道: > Hi Dian, > > I still think your implementation is similar to the window operator, you > mentioned the scalable trigger mechanism, the window API also can customize > trigger. > > Moreover, IMO, the design should guarantee a deterministic semantics, I > think based on memory availability is a non-deterministic design. > > In addition, if the implementation depends on the timing of checkpoint, I > do not think it is reasonable, we should avoid affecting the checkpoint's > progress. > > Best, > Vino > > Dian Fu <dian0511...@gmail.com> 于2019年6月5日周三 下午1:55写道: > >> Hi Vino, >> >> Thanks a lot for your reply. >> >> > 1) When, Why and How to judge the memory is exhausted? >> >> My point here is that the local aggregate operator can buffer the inputs >> in memory and send out the results AT ANY TIME. i.e. element count or the >> time interval reached a pre-configured value, the memory usage of buffered >> elements reached a configured valued (suppose we can estimate the object >> size efficiently), or even when checkpoint is triggered. >> >> > >> > 2) If the local aggregate operator rarely needs to operate the state, >> what >> > do you think about fault tolerance? >> >> AbstractStreamOperator provides a method `prepareSnapshotPreBarrier` >> which can be used here to send out the results to the downstream when >> checkpoint is triggered. Then fault tolerance can work well. >> >> Even if there is no such a method available, we can still store the >> buffered elements or pre-aggregate results to state when checkpoint is >> triggered. The state access will be much less compared with window operator >> as only the elements not sent out when checkpoint occur have to be written >> to state. Suppose the checkpoint interval is 3 minutes and the trigger >> interval is 10 seconds, then only about less than "10/180" elements will be >> written to state. >> >> >> Thanks, >> Dian >> >> >> > 在 2019年6月5日,上午11:43,Biao Liu <mmyy1...@gmail.com> 写道: >> > >> > Hi Vino, >> > >> > +1 for this feature. It's useful for data skew. And it could also reduce >> > shuffled datum. >> > >> > I have some concerns about the API part. From my side, this feature >> should >> > be more like an improvement. I'm afraid the proposal is an overkill >> about >> > the API part. Many other systems support pre-aggregation as an >> optimization >> > of global aggregation. The optimization might be used automatically or >> > manually but with a simple API. The proposal introduces a series of >> > flexible local aggregation APIs. They could be independent with global >> > aggregation. It doesn't look like an improvement but introduces a lot of >> > features. I'm not sure if there is a bigger picture later. As for now >> the >> > API part looks a little heavy for me. >> > >> > >> > vino yang <yanghua1...@gmail.com> 于2019年6月5日周三 上午10:38写道: >> > >> >> Hi Litree, >> >> >> >> From an implementation level, the localKeyBy API returns a general >> >> KeyedStream, you can call all the APIs which KeyedStream provides, we >> did >> >> not restrict its usage, although we can do this (for example returns a >> new >> >> stream object named LocalKeyedStream). >> >> >> >> However, to achieve the goal of local aggregation, it only makes sense >> to >> >> call the window API. >> >> >> >> Best, >> >> Vino >> >> >> >> litree <lyuan...@126.com> 于2019年6月4日周二 下午10:41写道: >> >> >> >>> Hi Vino, >> >>> >> >>> >> >>> I have read your design,something I want to know is the usage of these >> >> new >> >>> APIs.It looks like when I use localByKey,i must then use a window >> >> operator >> >>> to return a datastream,and then use keyby and another window operator >> to >> >>> get the final result? >> >>> >> >>> >> >>> thanks, >> >>> Litree >> >>> >> >>> >> >>> On 06/04/2019 17:22, vino yang wrote: >> >>> Hi Dian, >> >>> >> >>> Thanks for your reply. >> >>> >> >>> I know what you mean. However, if you think deeply, you will find your >> >>> implementation need to provide an operator which looks like a window >> >>> operator. You need to use state and receive aggregation function and >> >>> specify the trigger time. It looks like a lightweight window operator. >> >>> Right? >> >>> >> >>> We try to reuse Flink provided functions and reduce complexity. IMO, >> It >> >> is >> >>> more user-friendly because users are familiar with the window API. >> >>> >> >>> Best, >> >>> Vino >> >>> >> >>> >> >>> Dian Fu <dian0511...@gmail.com> 于2019年6月4日周二 下午4:19写道: >> >>> >> >>>> Hi Vino, >> >>>> >> >>>> Thanks a lot for starting this discussion. +1 to this feature as I >> >> think >> >>>> it will be very useful. >> >>>> >> >>>> Regarding to using window to buffer the input elements, personally I >> >>> don't >> >>>> think it's a good solution for the following reasons: >> >>>> 1) As we know that WindowOperator will store the accumulated results >> in >> >>>> states, this is not necessary for Local Aggregate operator. >> >>>> 2) For WindowOperator, each input element will be accumulated to >> >> states. >> >>>> This is also not necessary for Local Aggregate operator and storing >> the >> >>>> input elements in memory is enough. >> >>>> >> >>>> Thanks, >> >>>> Dian >> >>>> >> >>>>> 在 2019年6月4日,上午10:03,vino yang <yanghua1...@gmail.com> 写道: >> >>>>> >> >>>>> Hi Ken, >> >>>>> >> >>>>> Thanks for your reply. >> >>>>> >> >>>>> As I said before, we try to reuse Flink's state concept (fault >> >>> tolerance >> >>>>> and guarantee "Exactly-Once" semantics). So we did not consider >> >> cache. >> >>>>> >> >>>>> In addition, if we use Flink's state, the OOM related issue is not a >> >>> key >> >>>>> problem we need to consider. >> >>>>> >> >>>>> Best, >> >>>>> Vino >> >>>>> >> >>>>> Ken Krugler <kkrugler_li...@transpac.com> 于2019年6月4日周二 上午1:37写道: >> >>>>> >> >>>>>> Hi all, >> >>>>>> >> >>>>>> Cascading implemented this “map-side reduce” functionality with an >> >> LLR >> >>>>>> cache. >> >>>>>> >> >>>>>> That worked well, as then the skewed keys would always be in the >> >>> cache. >> >>>>>> >> >>>>>> The API let you decide the size of the cache, in terms of number of >> >>>>>> entries. >> >>>>>> >> >>>>>> Having a memory limit would have been better for many of our use >> >>> cases, >> >>>>>> though FWIR there’s no good way to estimate in-memory size for >> >>> objects. >> >>>>>> >> >>>>>> — Ken >> >>>>>> >> >>>>>>> On Jun 3, 2019, at 2:03 AM, vino yang <yanghua1...@gmail.com> >> >> wrote: >> >>>>>>> >> >>>>>>> Hi Piotr, >> >>>>>>> >> >>>>>>> The localKeyBy API returns an instance of KeyedStream (we just >> >> added >> >>> an >> >>>>>>> inner flag to identify the local mode) which is Flink has provided >> >>>>>> before. >> >>>>>>> Users can call all the APIs(especially *window* APIs) which >> >>> KeyedStream >> >>>>>>> provided. >> >>>>>>> >> >>>>>>> So if users want to use local aggregation, they should call the >> >>> window >> >>>>>> API >> >>>>>>> to build a local window that means users should (or say "can") >> >>> specify >> >>>>>> the >> >>>>>>> window length and other information based on their needs. >> >>>>>>> >> >>>>>>> I think you described another idea different from us. We did not >> >> try >> >>> to >> >>>>>>> react after triggering some predefined threshold. We tend to give >> >>> users >> >>>>>> the >> >>>>>>> discretion to make decisions. >> >>>>>>> >> >>>>>>> Our design idea tends to reuse Flink provided concept and >> functions >> >>>> like >> >>>>>>> state and window (IMO, we do not need to worry about OOM and the >> >>> issues >> >>>>>> you >> >>>>>>> mentioned). >> >>>>>>> >> >>>>>>> Best, >> >>>>>>> Vino >> >>>>>>> >> >>>>>>> Piotr Nowojski <pi...@ververica.com> 于2019年6月3日周一 下午4:30写道: >> >>>>>>> >> >>>>>>>> Hi, >> >>>>>>>> >> >>>>>>>> +1 for the idea from my side. I’ve even attempted to add similar >> >>>> feature >> >>>>>>>> quite some time ago, but didn’t get enough traction [1]. >> >>>>>>>> >> >>>>>>>> I’ve read through your document and I couldn’t find it mentioning >> >>>>>>>> anywhere, when the pre aggregated result should be emitted down >> >> the >> >>>>>> stream? >> >>>>>>>> I think that’s one of the most crucial decision, since wrong >> >>> decision >> >>>>>> here >> >>>>>>>> can lead to decrease of performance or to an explosion of >> >>> memory/state >> >>>>>>>> consumption (both with bounded and unbounded data streams). For >> >>>>>> streaming >> >>>>>>>> it can also lead to an increased latency. >> >>>>>>>> >> >>>>>>>> Since this is also a decision that’s impossible to make >> >>> automatically >> >>>>>>>> perfectly reliably, first and foremost I would expect this to be >> >>>>>>>> configurable via the API. With maybe some predefined triggers, >> >> like >> >>> on >> >>>>>>>> watermark (for windowed operations), on checkpoint barrier (to >> >>>> decrease >> >>>>>>>> state size?), on element count, maybe memory usage (much easier >> to >> >>>>>> estimate >> >>>>>>>> with a known/predefined types, like in SQL)… and with some option >> >> to >> >>>>>>>> implement custom trigger. >> >>>>>>>> >> >>>>>>>> Also what would work the best would be to have a some form of >> >> memory >> >>>>>>>> consumption priority. For example if we are running out of memory >> >>> for >> >>>>>>>> HashJoin/Final aggregation, instead of spilling to disk or >> >> crashing >> >>>> the >> >>>>>> job >> >>>>>>>> with OOM it would be probably better to prune/dump the pre/local >> >>>>>>>> aggregation state. But that’s another story. >> >>>>>>>> >> >>>>>>>> [1] https://github.com/apache/flink/pull/4626 < >> >>>>>>>> https://github.com/apache/flink/pull/4626> >> >>>>>>>> >> >>>>>>>> Piotrek >> >>>>>>>> >> >>>>>>>>> On 3 Jun 2019, at 10:16, sf lee <leesf0...@gmail.com> wrote: >> >>>>>>>>> >> >>>>>>>>> Excited and Big +1 for this feature. >> >>>>>>>>> >> >>>>>>>>> SHI Xiaogang <shixiaoga...@gmail.com> 于2019年6月3日周一 下午3:37写道: >> >>>>>>>>> >> >>>>>>>>>> Nice feature. >> >>>>>>>>>> Looking forward to having it in Flink. >> >>>>>>>>>> >> >>>>>>>>>> Regards, >> >>>>>>>>>> Xiaogang >> >>>>>>>>>> >> >>>>>>>>>> vino yang <yanghua1...@gmail.com> 于2019年6月3日周一 下午3:31写道: >> >>>>>>>>>> >> >>>>>>>>>>> Hi all, >> >>>>>>>>>>> >> >>>>>>>>>>> As we mentioned in some conference, such as Flink Forward SF >> >> 2019 >> >>>> and >> >>>>>>>>>> QCon >> >>>>>>>>>>> Beijing 2019, our team has implemented "Local aggregation" in >> >> our >> >>>>>> inner >> >>>>>>>>>>> Flink fork. This feature can effectively alleviate data skew. >> >>>>>>>>>>> >> >>>>>>>>>>> Currently, keyed streams are widely used to perform >> aggregating >> >>>>>>>>>> operations >> >>>>>>>>>>> (e.g., reduce, sum and window) on the elements that having the >> >>> same >> >>>>>>>> key. >> >>>>>>>>>>> When executed at runtime, the elements with the same key will >> >> be >> >>>> sent >> >>>>>>>> to >> >>>>>>>>>>> and aggregated by the same task. >> >>>>>>>>>>> >> >>>>>>>>>>> The performance of these aggregating operations is very >> >> sensitive >> >>>> to >> >>>>>>>> the >> >>>>>>>>>>> distribution of keys. In the cases where the distribution of >> >> keys >> >>>>>>>>>> follows a >> >>>>>>>>>>> powerful law, the performance will be significantly >> downgraded. >> >>>> More >> >>>>>>>>>>> unluckily, increasing the degree of parallelism does not help >> >>> when >> >>>> a >> >>>>>>>> task >> >>>>>>>>>>> is overloaded by a single key. >> >>>>>>>>>>> >> >>>>>>>>>>> Local aggregation is a widely-adopted method to reduce the >> >>>>>> performance >> >>>>>>>>>>> degraded by data skew. We can decompose the aggregating >> >>> operations >> >>>>>> into >> >>>>>>>>>> two >> >>>>>>>>>>> phases. In the first phase, we aggregate the elements of the >> >> same >> >>>> key >> >>>>>>>> at >> >>>>>>>>>>> the sender side to obtain partial results. Then at the second >> >>>> phase, >> >>>>>>>>>> these >> >>>>>>>>>>> partial results are sent to receivers according to their keys >> >> and >> >>>> are >> >>>>>>>>>>> combined to obtain the final result. Since the number of >> >> partial >> >>>>>>>> results >> >>>>>>>>>>> received by each receiver is limited by the number of senders, >> >>> the >> >>>>>>>>>>> imbalance among receivers can be reduced. Besides, by reducing >> >>> the >> >>>>>>>> amount >> >>>>>>>>>>> of transferred data the performance can be further improved. >> >>>>>>>>>>> >> >>>>>>>>>>> The design documentation is here: >> >>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>> >> >>>>>> >> >>>> >> >>> >> >> >> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing >> >>>>>>>>>>> >> >>>>>>>>>>> Any comment and feedback are welcome and appreciated. >> >>>>>>>>>>> >> >>>>>>>>>>> Best, >> >>>>>>>>>>> Vino >> >>>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>> >> >>>>>>>> >> >>>>>> >> >>>>>> -------------------------- >> >>>>>> Ken Krugler >> >>>>>> +1 530-210-6378 >> >>>>>> http://www.scaleunlimited.com >> >>>>>> Custom big data solutions & training >> >>>>>> Flink, Solr, Hadoop, Cascading & Cassandra >> >>>>>> >> >>>>>> >> >>>> >> >>>> >> >>> >> >> >> >>