Hi,
+1 from my side. Looking forward to this must-have feature :) Best, boshu At 2019-06-05 16:33:13, "vino yang" <yanghua1...@gmail.com> wrote: >Hi Aljoscha, > >What do you think about this feature and design document? > >Best, >Vino > >vino yang <yanghua1...@gmail.com> 于2019年6月5日周三 下午4:18写道: > >> Hi Dian, >> >> I still think your implementation is similar to the window operator, you >> mentioned the scalable trigger mechanism, the window API also can customize >> trigger. >> >> Moreover, IMO, the design should guarantee a deterministic semantics, I >> think based on memory availability is a non-deterministic design. >> >> In addition, if the implementation depends on the timing of checkpoint, I >> do not think it is reasonable, we should avoid affecting the checkpoint's >> progress. >> >> Best, >> Vino >> >> Dian Fu <dian0511...@gmail.com> 于2019年6月5日周三 下午1:55写道: >> >>> Hi Vino, >>> >>> Thanks a lot for your reply. >>> >>> > 1) When, Why and How to judge the memory is exhausted? >>> >>> My point here is that the local aggregate operator can buffer the inputs >>> in memory and send out the results AT ANY TIME. i.e. element count or the >>> time interval reached a pre-configured value, the memory usage of buffered >>> elements reached a configured valued (suppose we can estimate the object >>> size efficiently), or even when checkpoint is triggered. >>> >>> > >>> > 2) If the local aggregate operator rarely needs to operate the state, >>> what >>> > do you think about fault tolerance? >>> >>> AbstractStreamOperator provides a method `prepareSnapshotPreBarrier` >>> which can be used here to send out the results to the downstream when >>> checkpoint is triggered. Then fault tolerance can work well. >>> >>> Even if there is no such a method available, we can still store the >>> buffered elements or pre-aggregate results to state when checkpoint is >>> triggered. The state access will be much less compared with window operator >>> as only the elements not sent out when checkpoint occur have to be written >>> to state. Suppose the checkpoint interval is 3 minutes and the trigger >>> interval is 10 seconds, then only about less than "10/180" elements will be >>> written to state. >>> >>> >>> Thanks, >>> Dian >>> >>> >>> > 在 2019年6月5日,上午11:43,Biao Liu <mmyy1...@gmail.com> 写道: >>> > >>> > Hi Vino, >>> > >>> > +1 for this feature. It's useful for data skew. And it could also reduce >>> > shuffled datum. >>> > >>> > I have some concerns about the API part. From my side, this feature >>> should >>> > be more like an improvement. I'm afraid the proposal is an overkill >>> about >>> > the API part. Many other systems support pre-aggregation as an >>> optimization >>> > of global aggregation. The optimization might be used automatically or >>> > manually but with a simple API. The proposal introduces a series of >>> > flexible local aggregation APIs. They could be independent with global >>> > aggregation. It doesn't look like an improvement but introduces a lot of >>> > features. I'm not sure if there is a bigger picture later. As for now >>> the >>> > API part looks a little heavy for me. >>> > >>> > >>> > vino yang <yanghua1...@gmail.com> 于2019年6月5日周三 上午10:38写道: >>> > >>> >> Hi Litree, >>> >> >>> >> From an implementation level, the localKeyBy API returns a general >>> >> KeyedStream, you can call all the APIs which KeyedStream provides, we >>> did >>> >> not restrict its usage, although we can do this (for example returns a >>> new >>> >> stream object named LocalKeyedStream). >>> >> >>> >> However, to achieve the goal of local aggregation, it only makes sense >>> to >>> >> call the window API. >>> >> >>> >> Best, >>> >> Vino >>> >> >>> >> litree <lyuan...@126.com> 于2019年6月4日周二 下午10:41写道: >>> >> >>> >>> Hi Vino, >>> >>> >>> >>> >>> >>> I have read your design,something I want to know is the usage of these >>> >> new >>> >>> APIs.It looks like when I use localByKey,i must then use a window >>> >> operator >>> >>> to return a datastream,and then use keyby and another window operator >>> to >>> >>> get the final result? >>> >>> >>> >>> >>> >>> thanks, >>> >>> Litree >>> >>> >>> >>> >>> >>> On 06/04/2019 17:22, vino yang wrote: >>> >>> Hi Dian, >>> >>> >>> >>> Thanks for your reply. >>> >>> >>> >>> I know what you mean. However, if you think deeply, you will find your >>> >>> implementation need to provide an operator which looks like a window >>> >>> operator. You need to use state and receive aggregation function and >>> >>> specify the trigger time. It looks like a lightweight window operator. >>> >>> Right? >>> >>> >>> >>> We try to reuse Flink provided functions and reduce complexity. IMO, >>> It >>> >> is >>> >>> more user-friendly because users are familiar with the window API. >>> >>> >>> >>> Best, >>> >>> Vino >>> >>> >>> >>> >>> >>> Dian Fu <dian0511...@gmail.com> 于2019年6月4日周二 下午4:19写道: >>> >>> >>> >>>> Hi Vino, >>> >>>> >>> >>>> Thanks a lot for starting this discussion. +1 to this feature as I >>> >> think >>> >>>> it will be very useful. >>> >>>> >>> >>>> Regarding to using window to buffer the input elements, personally I >>> >>> don't >>> >>>> think it's a good solution for the following reasons: >>> >>>> 1) As we know that WindowOperator will store the accumulated results >>> in >>> >>>> states, this is not necessary for Local Aggregate operator. >>> >>>> 2) For WindowOperator, each input element will be accumulated to >>> >> states. >>> >>>> This is also not necessary for Local Aggregate operator and storing >>> the >>> >>>> input elements in memory is enough. >>> >>>> >>> >>>> Thanks, >>> >>>> Dian >>> >>>> >>> >>>>> 在 2019年6月4日,上午10:03,vino yang <yanghua1...@gmail.com> 写道: >>> >>>>> >>> >>>>> Hi Ken, >>> >>>>> >>> >>>>> Thanks for your reply. >>> >>>>> >>> >>>>> As I said before, we try to reuse Flink's state concept (fault >>> >>> tolerance >>> >>>>> and guarantee "Exactly-Once" semantics). So we did not consider >>> >> cache. >>> >>>>> >>> >>>>> In addition, if we use Flink's state, the OOM related issue is not a >>> >>> key >>> >>>>> problem we need to consider. >>> >>>>> >>> >>>>> Best, >>> >>>>> Vino >>> >>>>> >>> >>>>> Ken Krugler <kkrugler_li...@transpac.com> 于2019年6月4日周二 上午1:37写道: >>> >>>>> >>> >>>>>> Hi all, >>> >>>>>> >>> >>>>>> Cascading implemented this “map-side reduce” functionality with an >>> >> LLR >>> >>>>>> cache. >>> >>>>>> >>> >>>>>> That worked well, as then the skewed keys would always be in the >>> >>> cache. >>> >>>>>> >>> >>>>>> The API let you decide the size of the cache, in terms of number of >>> >>>>>> entries. >>> >>>>>> >>> >>>>>> Having a memory limit would have been better for many of our use >>> >>> cases, >>> >>>>>> though FWIR there’s no good way to estimate in-memory size for >>> >>> objects. >>> >>>>>> >>> >>>>>> — Ken >>> >>>>>> >>> >>>>>>> On Jun 3, 2019, at 2:03 AM, vino yang <yanghua1...@gmail.com> >>> >> wrote: >>> >>>>>>> >>> >>>>>>> Hi Piotr, >>> >>>>>>> >>> >>>>>>> The localKeyBy API returns an instance of KeyedStream (we just >>> >> added >>> >>> an >>> >>>>>>> inner flag to identify the local mode) which is Flink has provided >>> >>>>>> before. >>> >>>>>>> Users can call all the APIs(especially *window* APIs) which >>> >>> KeyedStream >>> >>>>>>> provided. >>> >>>>>>> >>> >>>>>>> So if users want to use local aggregation, they should call the >>> >>> window >>> >>>>>> API >>> >>>>>>> to build a local window that means users should (or say "can") >>> >>> specify >>> >>>>>> the >>> >>>>>>> window length and other information based on their needs. >>> >>>>>>> >>> >>>>>>> I think you described another idea different from us. We did not >>> >> try >>> >>> to >>> >>>>>>> react after triggering some predefined threshold. We tend to give >>> >>> users >>> >>>>>> the >>> >>>>>>> discretion to make decisions. >>> >>>>>>> >>> >>>>>>> Our design idea tends to reuse Flink provided concept and >>> functions >>> >>>> like >>> >>>>>>> state and window (IMO, we do not need to worry about OOM and the >>> >>> issues >>> >>>>>> you >>> >>>>>>> mentioned). >>> >>>>>>> >>> >>>>>>> Best, >>> >>>>>>> Vino >>> >>>>>>> >>> >>>>>>> Piotr Nowojski <pi...@ververica.com> 于2019年6月3日周一 下午4:30写道: >>> >>>>>>> >>> >>>>>>>> Hi, >>> >>>>>>>> >>> >>>>>>>> +1 for the idea from my side. I’ve even attempted to add similar >>> >>>> feature >>> >>>>>>>> quite some time ago, but didn’t get enough traction [1]. >>> >>>>>>>> >>> >>>>>>>> I’ve read through your document and I couldn’t find it mentioning >>> >>>>>>>> anywhere, when the pre aggregated result should be emitted down >>> >> the >>> >>>>>> stream? >>> >>>>>>>> I think that’s one of the most crucial decision, since wrong >>> >>> decision >>> >>>>>> here >>> >>>>>>>> can lead to decrease of performance or to an explosion of >>> >>> memory/state >>> >>>>>>>> consumption (both with bounded and unbounded data streams). For >>> >>>>>> streaming >>> >>>>>>>> it can also lead to an increased latency. >>> >>>>>>>> >>> >>>>>>>> Since this is also a decision that’s impossible to make >>> >>> automatically >>> >>>>>>>> perfectly reliably, first and foremost I would expect this to be >>> >>>>>>>> configurable via the API. With maybe some predefined triggers, >>> >> like >>> >>> on >>> >>>>>>>> watermark (for windowed operations), on checkpoint barrier (to >>> >>>> decrease >>> >>>>>>>> state size?), on element count, maybe memory usage (much easier >>> to >>> >>>>>> estimate >>> >>>>>>>> with a known/predefined types, like in SQL)… and with some option >>> >> to >>> >>>>>>>> implement custom trigger. >>> >>>>>>>> >>> >>>>>>>> Also what would work the best would be to have a some form of >>> >> memory >>> >>>>>>>> consumption priority. For example if we are running out of memory >>> >>> for >>> >>>>>>>> HashJoin/Final aggregation, instead of spilling to disk or >>> >> crashing >>> >>>> the >>> >>>>>> job >>> >>>>>>>> with OOM it would be probably better to prune/dump the pre/local >>> >>>>>>>> aggregation state. But that’s another story. >>> >>>>>>>> >>> >>>>>>>> [1] https://github.com/apache/flink/pull/4626 < >>> >>>>>>>> https://github.com/apache/flink/pull/4626> >>> >>>>>>>> >>> >>>>>>>> Piotrek >>> >>>>>>>> >>> >>>>>>>>> On 3 Jun 2019, at 10:16, sf lee <leesf0...@gmail.com> wrote: >>> >>>>>>>>> >>> >>>>>>>>> Excited and Big +1 for this feature. >>> >>>>>>>>> >>> >>>>>>>>> SHI Xiaogang <shixiaoga...@gmail.com> 于2019年6月3日周一 下午3:37写道: >>> >>>>>>>>> >>> >>>>>>>>>> Nice feature. >>> >>>>>>>>>> Looking forward to having it in Flink. >>> >>>>>>>>>> >>> >>>>>>>>>> Regards, >>> >>>>>>>>>> Xiaogang >>> >>>>>>>>>> >>> >>>>>>>>>> vino yang <yanghua1...@gmail.com> 于2019年6月3日周一 下午3:31写道: >>> >>>>>>>>>> >>> >>>>>>>>>>> Hi all, >>> >>>>>>>>>>> >>> >>>>>>>>>>> As we mentioned in some conference, such as Flink Forward SF >>> >> 2019 >>> >>>> and >>> >>>>>>>>>> QCon >>> >>>>>>>>>>> Beijing 2019, our team has implemented "Local aggregation" in >>> >> our >>> >>>>>> inner >>> >>>>>>>>>>> Flink fork. This feature can effectively alleviate data skew. >>> >>>>>>>>>>> >>> >>>>>>>>>>> Currently, keyed streams are widely used to perform >>> aggregating >>> >>>>>>>>>> operations >>> >>>>>>>>>>> (e.g., reduce, sum and window) on the elements that having the >>> >>> same >>> >>>>>>>> key. >>> >>>>>>>>>>> When executed at runtime, the elements with the same key will >>> >> be >>> >>>> sent >>> >>>>>>>> to >>> >>>>>>>>>>> and aggregated by the same task. >>> >>>>>>>>>>> >>> >>>>>>>>>>> The performance of these aggregating operations is very >>> >> sensitive >>> >>>> to >>> >>>>>>>> the >>> >>>>>>>>>>> distribution of keys. In the cases where the distribution of >>> >> keys >>> >>>>>>>>>> follows a >>> >>>>>>>>>>> powerful law, the performance will be significantly >>> downgraded. >>> >>>> More >>> >>>>>>>>>>> unluckily, increasing the degree of parallelism does not help >>> >>> when >>> >>>> a >>> >>>>>>>> task >>> >>>>>>>>>>> is overloaded by a single key. >>> >>>>>>>>>>> >>> >>>>>>>>>>> Local aggregation is a widely-adopted method to reduce the >>> >>>>>> performance >>> >>>>>>>>>>> degraded by data skew. We can decompose the aggregating >>> >>> operations >>> >>>>>> into >>> >>>>>>>>>> two >>> >>>>>>>>>>> phases. In the first phase, we aggregate the elements of the >>> >> same >>> >>>> key >>> >>>>>>>> at >>> >>>>>>>>>>> the sender side to obtain partial results. Then at the second >>> >>>> phase, >>> >>>>>>>>>> these >>> >>>>>>>>>>> partial results are sent to receivers according to their keys >>> >> and >>> >>>> are >>> >>>>>>>>>>> combined to obtain the final result. Since the number of >>> >> partial >>> >>>>>>>> results >>> >>>>>>>>>>> received by each receiver is limited by the number of senders, >>> >>> the >>> >>>>>>>>>>> imbalance among receivers can be reduced. Besides, by reducing >>> >>> the >>> >>>>>>>> amount >>> >>>>>>>>>>> of transferred data the performance can be further improved. >>> >>>>>>>>>>> >>> >>>>>>>>>>> The design documentation is here: >>> >>>>>>>>>>> >>> >>>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>> >>> >>>>>> >>> >>>> >>> >>> >>> >> >>> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing >>> >>>>>>>>>>> >>> >>>>>>>>>>> Any comment and feedback are welcome and appreciated. >>> >>>>>>>>>>> >>> >>>>>>>>>>> Best, >>> >>>>>>>>>>> Vino >>> >>>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>> >>> >>>>>>>> >>> >>>>>> >>> >>>>>> -------------------------- >>> >>>>>> Ken Krugler >>> >>>>>> +1 530-210-6378 >>> >>>>>> http://www.scaleunlimited.com >>> >>>>>> Custom big data solutions & training >>> >>>>>> Flink, Solr, Hadoop, Cascading & Cassandra >>> >>>>>> >>> >>>>>> >>> >>>> >>> >>>> >>> >>> >>> >> >>> >>>