Hi Ken, Thanks for your reply.
As I said before, we try to reuse Flink's state concept (fault tolerance and guarantee "Exactly-Once" semantics). So we did not consider cache. In addition, if we use Flink's state, the OOM related issue is not a key problem we need to consider. Best, Vino Ken Krugler <kkrugler_li...@transpac.com> 于2019年6月4日周二 上午1:37写道: > Hi all, > > Cascading implemented this “map-side reduce” functionality with an LLR > cache. > > That worked well, as then the skewed keys would always be in the cache. > > The API let you decide the size of the cache, in terms of number of > entries. > > Having a memory limit would have been better for many of our use cases, > though FWIR there’s no good way to estimate in-memory size for objects. > > — Ken > > > On Jun 3, 2019, at 2:03 AM, vino yang <yanghua1...@gmail.com> wrote: > > > > Hi Piotr, > > > > The localKeyBy API returns an instance of KeyedStream (we just added an > > inner flag to identify the local mode) which is Flink has provided > before. > > Users can call all the APIs(especially *window* APIs) which KeyedStream > > provided. > > > > So if users want to use local aggregation, they should call the window > API > > to build a local window that means users should (or say "can") specify > the > > window length and other information based on their needs. > > > > I think you described another idea different from us. We did not try to > > react after triggering some predefined threshold. We tend to give users > the > > discretion to make decisions. > > > > Our design idea tends to reuse Flink provided concept and functions like > > state and window (IMO, we do not need to worry about OOM and the issues > you > > mentioned). > > > > Best, > > Vino > > > > Piotr Nowojski <pi...@ververica.com> 于2019年6月3日周一 下午4:30写道: > > > >> Hi, > >> > >> +1 for the idea from my side. I’ve even attempted to add similar feature > >> quite some time ago, but didn’t get enough traction [1]. > >> > >> I’ve read through your document and I couldn’t find it mentioning > >> anywhere, when the pre aggregated result should be emitted down the > stream? > >> I think that’s one of the most crucial decision, since wrong decision > here > >> can lead to decrease of performance or to an explosion of memory/state > >> consumption (both with bounded and unbounded data streams). For > streaming > >> it can also lead to an increased latency. > >> > >> Since this is also a decision that’s impossible to make automatically > >> perfectly reliably, first and foremost I would expect this to be > >> configurable via the API. With maybe some predefined triggers, like on > >> watermark (for windowed operations), on checkpoint barrier (to decrease > >> state size?), on element count, maybe memory usage (much easier to > estimate > >> with a known/predefined types, like in SQL)… and with some option to > >> implement custom trigger. > >> > >> Also what would work the best would be to have a some form of memory > >> consumption priority. For example if we are running out of memory for > >> HashJoin/Final aggregation, instead of spilling to disk or crashing the > job > >> with OOM it would be probably better to prune/dump the pre/local > >> aggregation state. But that’s another story. > >> > >> [1] https://github.com/apache/flink/pull/4626 < > >> https://github.com/apache/flink/pull/4626> > >> > >> Piotrek > >> > >>> On 3 Jun 2019, at 10:16, sf lee <leesf0...@gmail.com> wrote: > >>> > >>> Excited and Big +1 for this feature. > >>> > >>> SHI Xiaogang <shixiaoga...@gmail.com> 于2019年6月3日周一 下午3:37写道: > >>> > >>>> Nice feature. > >>>> Looking forward to having it in Flink. > >>>> > >>>> Regards, > >>>> Xiaogang > >>>> > >>>> vino yang <yanghua1...@gmail.com> 于2019年6月3日周一 下午3:31写道: > >>>> > >>>>> Hi all, > >>>>> > >>>>> As we mentioned in some conference, such as Flink Forward SF 2019 and > >>>> QCon > >>>>> Beijing 2019, our team has implemented "Local aggregation" in our > inner > >>>>> Flink fork. This feature can effectively alleviate data skew. > >>>>> > >>>>> Currently, keyed streams are widely used to perform aggregating > >>>> operations > >>>>> (e.g., reduce, sum and window) on the elements that having the same > >> key. > >>>>> When executed at runtime, the elements with the same key will be sent > >> to > >>>>> and aggregated by the same task. > >>>>> > >>>>> The performance of these aggregating operations is very sensitive to > >> the > >>>>> distribution of keys. In the cases where the distribution of keys > >>>> follows a > >>>>> powerful law, the performance will be significantly downgraded. More > >>>>> unluckily, increasing the degree of parallelism does not help when a > >> task > >>>>> is overloaded by a single key. > >>>>> > >>>>> Local aggregation is a widely-adopted method to reduce the > performance > >>>>> degraded by data skew. We can decompose the aggregating operations > into > >>>> two > >>>>> phases. In the first phase, we aggregate the elements of the same key > >> at > >>>>> the sender side to obtain partial results. Then at the second phase, > >>>> these > >>>>> partial results are sent to receivers according to their keys and are > >>>>> combined to obtain the final result. Since the number of partial > >> results > >>>>> received by each receiver is limited by the number of senders, the > >>>>> imbalance among receivers can be reduced. Besides, by reducing the > >> amount > >>>>> of transferred data the performance can be further improved. > >>>>> > >>>>> The design documentation is here: > >>>>> > >>>>> > >>>> > >> > https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing > >>>>> > >>>>> Any comment and feedback are welcome and appreciated. > >>>>> > >>>>> Best, > >>>>> Vino > >>>>> > >>>> > >> > >> > > -------------------------- > Ken Krugler > +1 530-210-6378 > http://www.scaleunlimited.com > Custom big data solutions & training > Flink, Solr, Hadoop, Cascading & Cassandra > >