Hi Vino, > So if users want to use local aggregation, they should call the window API > to build a local window that means users should (or say "can") specify the > window length and other information based on their needs.
It sounds ok for me. It would have to be run against some API guys from the community though. Piotrek > On 4 Jun 2019, at 10:19, Dian Fu <dian0511...@gmail.com> wrote: > > Hi Vino, > > Thanks a lot for starting this discussion. +1 to this feature as I think it > will be very useful. > > Regarding to using window to buffer the input elements, personally I don't > think it's a good solution for the following reasons: > 1) As we know that WindowOperator will store the accumulated results in > states, this is not necessary for Local Aggregate operator. > 2) For WindowOperator, each input element will be accumulated to states. This > is also not necessary for Local Aggregate operator and storing the input > elements in memory is enough. > > Thanks, > Dian > >> 在 2019年6月4日,上午10:03,vino yang <yanghua1...@gmail.com> 写道: >> >> Hi Ken, >> >> Thanks for your reply. >> >> As I said before, we try to reuse Flink's state concept (fault tolerance >> and guarantee "Exactly-Once" semantics). So we did not consider cache. >> >> In addition, if we use Flink's state, the OOM related issue is not a key >> problem we need to consider. >> >> Best, >> Vino >> >> Ken Krugler <kkrugler_li...@transpac.com> 于2019年6月4日周二 上午1:37写道: >> >>> Hi all, >>> >>> Cascading implemented this “map-side reduce” functionality with an LLR >>> cache. >>> >>> That worked well, as then the skewed keys would always be in the cache. >>> >>> The API let you decide the size of the cache, in terms of number of >>> entries. >>> >>> Having a memory limit would have been better for many of our use cases, >>> though FWIR there’s no good way to estimate in-memory size for objects. >>> >>> — Ken >>> >>>> On Jun 3, 2019, at 2:03 AM, vino yang <yanghua1...@gmail.com> wrote: >>>> >>>> Hi Piotr, >>>> >>>> The localKeyBy API returns an instance of KeyedStream (we just added an >>>> inner flag to identify the local mode) which is Flink has provided >>> before. >>>> Users can call all the APIs(especially *window* APIs) which KeyedStream >>>> provided. >>>> >>>> So if users want to use local aggregation, they should call the window >>> API >>>> to build a local window that means users should (or say "can") specify >>> the >>>> window length and other information based on their needs. >>>> >>>> I think you described another idea different from us. We did not try to >>>> react after triggering some predefined threshold. We tend to give users >>> the >>>> discretion to make decisions. >>>> >>>> Our design idea tends to reuse Flink provided concept and functions like >>>> state and window (IMO, we do not need to worry about OOM and the issues >>> you >>>> mentioned). >>>> >>>> Best, >>>> Vino >>>> >>>> Piotr Nowojski <pi...@ververica.com> 于2019年6月3日周一 下午4:30写道: >>>> >>>>> Hi, >>>>> >>>>> +1 for the idea from my side. I’ve even attempted to add similar feature >>>>> quite some time ago, but didn’t get enough traction [1]. >>>>> >>>>> I’ve read through your document and I couldn’t find it mentioning >>>>> anywhere, when the pre aggregated result should be emitted down the >>> stream? >>>>> I think that’s one of the most crucial decision, since wrong decision >>> here >>>>> can lead to decrease of performance or to an explosion of memory/state >>>>> consumption (both with bounded and unbounded data streams). For >>> streaming >>>>> it can also lead to an increased latency. >>>>> >>>>> Since this is also a decision that’s impossible to make automatically >>>>> perfectly reliably, first and foremost I would expect this to be >>>>> configurable via the API. With maybe some predefined triggers, like on >>>>> watermark (for windowed operations), on checkpoint barrier (to decrease >>>>> state size?), on element count, maybe memory usage (much easier to >>> estimate >>>>> with a known/predefined types, like in SQL)… and with some option to >>>>> implement custom trigger. >>>>> >>>>> Also what would work the best would be to have a some form of memory >>>>> consumption priority. For example if we are running out of memory for >>>>> HashJoin/Final aggregation, instead of spilling to disk or crashing the >>> job >>>>> with OOM it would be probably better to prune/dump the pre/local >>>>> aggregation state. But that’s another story. >>>>> >>>>> [1] https://github.com/apache/flink/pull/4626 < >>>>> https://github.com/apache/flink/pull/4626> >>>>> >>>>> Piotrek >>>>> >>>>>> On 3 Jun 2019, at 10:16, sf lee <leesf0...@gmail.com> wrote: >>>>>> >>>>>> Excited and Big +1 for this feature. >>>>>> >>>>>> SHI Xiaogang <shixiaoga...@gmail.com> 于2019年6月3日周一 下午3:37写道: >>>>>> >>>>>>> Nice feature. >>>>>>> Looking forward to having it in Flink. >>>>>>> >>>>>>> Regards, >>>>>>> Xiaogang >>>>>>> >>>>>>> vino yang <yanghua1...@gmail.com> 于2019年6月3日周一 下午3:31写道: >>>>>>> >>>>>>>> Hi all, >>>>>>>> >>>>>>>> As we mentioned in some conference, such as Flink Forward SF 2019 and >>>>>>> QCon >>>>>>>> Beijing 2019, our team has implemented "Local aggregation" in our >>> inner >>>>>>>> Flink fork. This feature can effectively alleviate data skew. >>>>>>>> >>>>>>>> Currently, keyed streams are widely used to perform aggregating >>>>>>> operations >>>>>>>> (e.g., reduce, sum and window) on the elements that having the same >>>>> key. >>>>>>>> When executed at runtime, the elements with the same key will be sent >>>>> to >>>>>>>> and aggregated by the same task. >>>>>>>> >>>>>>>> The performance of these aggregating operations is very sensitive to >>>>> the >>>>>>>> distribution of keys. In the cases where the distribution of keys >>>>>>> follows a >>>>>>>> powerful law, the performance will be significantly downgraded. More >>>>>>>> unluckily, increasing the degree of parallelism does not help when a >>>>> task >>>>>>>> is overloaded by a single key. >>>>>>>> >>>>>>>> Local aggregation is a widely-adopted method to reduce the >>> performance >>>>>>>> degraded by data skew. We can decompose the aggregating operations >>> into >>>>>>> two >>>>>>>> phases. In the first phase, we aggregate the elements of the same key >>>>> at >>>>>>>> the sender side to obtain partial results. Then at the second phase, >>>>>>> these >>>>>>>> partial results are sent to receivers according to their keys and are >>>>>>>> combined to obtain the final result. Since the number of partial >>>>> results >>>>>>>> received by each receiver is limited by the number of senders, the >>>>>>>> imbalance among receivers can be reduced. Besides, by reducing the >>>>> amount >>>>>>>> of transferred data the performance can be further improved. >>>>>>>> >>>>>>>> The design documentation is here: >>>>>>>> >>>>>>>> >>>>>>> >>>>> >>> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing >>>>>>>> >>>>>>>> Any comment and feedback are welcome and appreciated. >>>>>>>> >>>>>>>> Best, >>>>>>>> Vino >>>>>>>> >>>>>>> >>>>> >>>>> >>> >>> -------------------------- >>> Ken Krugler >>> +1 530-210-6378 >>> http://www.scaleunlimited.com >>> Custom big data solutions & training >>> Flink, Solr, Hadoop, Cascading & Cassandra >>> >>> >