Hi Kurt and Piotrek, Thanks for your comments.
I agree that we can provide a better abstraction to be compatible with two different implementations. First of all, I think we should consider what kind of scenarios we need to support in *API* level? We have some use cases which need to a customized aggregation through KeyedProcessFunction, (in the usage of our localKeyBy.window they can use ProcessWindowFunction). Shall we support these flexible use scenarios? Best, Vino Kurt Young <ykt...@gmail.com> 于2019年6月18日周二 下午8:37写道: > Hi Piotr, > > Thanks for joining the discussion. Make “local aggregation" abstract enough > sounds good to me, we could > implement and verify alternative solutions for use cases of local > aggregation. Maybe we will find both solutions > are appropriate for different scenarios. > > Starting from a simple one sounds a practical way to go. What do you think, > vino? > > Best, > Kurt > > > On Tue, Jun 18, 2019 at 8:10 PM Piotr Nowojski <pi...@ververica.com> > wrote: > > > Hi Kurt and Vino, > > > > I think there is a trade of hat we need to consider for the local > > aggregation. > > > > Generally speaking I would agree with Kurt about local aggregation/pre > > aggregation not using Flink's state flush the operator on a checkpoint. > > Network IO is usually cheaper compared to Disks IO. This has however > couple > > of issues: > > 1. It can explode number of in-flight records during checkpoint barrier > > alignment, making checkpointing slower and decrease the actual > throughput. > > 2. This trades Disks IO on the local aggregation machine with CPU (and > > Disks IO in case of RocksDB) on the final aggregation machine. This is > > fine, as long there is no huge data skew. If there is only a handful (or > > even one single) hot keys, it might be better to keep the persistent > state > > in the LocalAggregationOperator to offload final aggregation as much as > > possible. > > 3. With frequent checkpointing local aggregation effectiveness would > > degrade. > > > > I assume Kurt is correct, that in your use cases stateless operator was > > behaving better, but I could easily see other use cases as well. For > > example someone is already using RocksDB, and his job is bottlenecked on > a > > single window operator instance because of the data skew. In that case > > stateful local aggregation would be probably a better choice. > > > > Because of that, I think we should eventually provide both versions and > in > > the initial version we should at least make the “local aggregation > engine” > > abstract enough, that one could easily provide different implementation > > strategy. > > > > Piotrek > > > > > On 18 Jun 2019, at 11:46, Kurt Young <ykt...@gmail.com> wrote: > > > > > > Hi, > > > > > > For the trigger, it depends on what operator we want to use under the > > API. > > > If we choose to use window operator, > > > we should also use window's trigger. However, I also think reuse window > > > operator for this scenario may not be > > > the best choice. The reasons are the following: > > > > > > 1. As a lot of people already pointed out, window relies heavily on > state > > > and it will definitely effect performance. You can > > > argue that one can use heap based statebackend, but this will introduce > > > extra coupling. Especially we have a chance to > > > design a pure stateless operator. > > > 2. The window operator is *the most* complicated operator Flink > currently > > > have. Maybe we only need to pick a subset of > > > window operator to achieve the goal, but once the user wants to have a > > deep > > > look at the localAggregation operator, it's still > > > hard to find out what's going on under the window operator. For > > simplicity, > > > I would also recommend we introduce a dedicated > > > lightweight operator, which also much easier for a user to learn and > use. > > > > > > For your question about increasing the burden in > > > `StreamOperator::prepareSnapshotPreBarrier()`, the only thing this > > function > > > need > > > to do is output all the partial results, it's purely cpu workload, not > > > introducing any IO. I want to point out that even if we have this > > > cost, we reduced another barrier align cost of the operator, which is > the > > > sync flush stage of the state, if you introduced state. This > > > flush actually will introduce disk IO, and I think it's worthy to > > exchange > > > this cost with purely CPU workload. And we do have some > > > observations about these two behavior (as i said before, we actually > > > implemented both solutions), the stateless one actually performs > > > better both in performance and barrier align time. > > > > > > Best, > > > Kurt > > > > > > > > > On Tue, Jun 18, 2019 at 3:40 PM vino yang <yanghua1...@gmail.com> > wrote: > > > > > >> Hi Kurt, > > >> > > >> Thanks for your example. Now, it looks more clearly for me. > > >> > > >> From your example code snippet, I saw the localAggregate API has three > > >> parameters: > > >> > > >> 1. key field > > >> 2. PartitionAvg > > >> 3. CountTrigger: Does this trigger comes from window package? > > >> > > >> I will compare our and your design from API and operator level: > > >> > > >> *From the API level:* > > >> > > >> As I replied to @dianfu in the old email thread,[1] the Window API can > > >> provide the second and the third parameter right now. > > >> > > >> If you reuse specified interface or class, such as *Trigger* or > > >> *CounterTrigger* provided by window package, but do not use window > API, > > >> it's not reasonable. > > >> And if you do not reuse these interface or class, you would need to > > >> introduce more things however they are looked similar to the things > > >> provided by window package. > > >> > > >> The window package has provided several types of the window and many > > >> triggers and let users customize it. What's more, the user is more > > familiar > > >> with Window API. > > >> > > >> This is the reason why we just provide localKeyBy API and reuse the > > window > > >> API. It reduces unnecessary components such as triggers and the > > mechanism > > >> of buffer (based on count num or time). > > >> And it has a clear and easy to understand semantics. > > >> > > >> *From the operator level:* > > >> > > >> We reused window operator, so we can get all the benefits from state > and > > >> checkpoint. > > >> > > >> From your design, you named the operator under localAggregate API is a > > >> *stateless* operator. IMO, it is still a state, it is just not Flink > > >> managed state. > > >> About the memory buffer (I think it's still not very clear, if you > have > > >> time, can you give more detail information or answer my questions), I > > have > > >> some questions: > > >> > > >> - if it just a raw JVM heap memory buffer, how to support fault > > >> tolerance, if the job is configured EXACTLY-ONCE semantic guarantee? > > >> - if you thought the memory buffer(non-Flink state), has better > > >> performance. In our design, users can also config HEAP state backend > > to > > >> provide the performance close to your mechanism. > > >> - `StreamOperator::prepareSnapshotPreBarrier()` related to the > timing > > of > > >> snapshot. IMO, the flush action should be a synchronized action? (if > > >> not, > > >> please point out my mistake) I still think we should not depend on > the > > >> timing of checkpoint. Checkpoint related operations are inherent > > >> performance sensitive, we should not increase its burden anymore. > Our > > >> implementation based on the mechanism of Flink's checkpoint, which > can > > >> benefit from the asnyc snapshot and incremental checkpoint. IMO, the > > >> performance is not a problem, and we also do not find the > performance > > >> issue > > >> in our production. > > >> > > >> [1]: > > >> > > >> > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308 > > >> > > >> Best, > > >> Vino > > >> > > >> Kurt Young <ykt...@gmail.com> 于2019年6月18日周二 下午2:27写道: > > >> > > >>> Yeah, sorry for not expressing myself clearly. I will try to provide > > more > > >>> details to make sure we are on the same page. > > >>> > > >>> For DataStream API, it shouldn't be optimized automatically. You have > > to > > >>> explicitly call API to do local aggregation > > >>> as well as the trigger policy of the local aggregation. Take average > > for > > >>> example, the user program may look like this (just a draft): > > >>> > > >>> assuming the input type is DataStream<Tupl2<String, Int>> > > >>> > > >>> ds.localAggregate( > > >>> 0, // The local key, > which > > >> is > > >>> the String from Tuple2 > > >>> PartitionAvg(1), // The partial aggregation > > >>> function, produces Tuple2<Long, Int>, indicating sum and count > > >>> CountTrigger.of(1000L) // Trigger policy, note this should > be > > >>> best effort, and also be composited with time based or memory size > > based > > >>> trigger > > >>> ) // The return type is > > >> local > > >>> aggregate Tuple2<String, Tupl2<Long, Int>> > > >>> .keyBy(0) // Further keyby it with > > >> required > > >>> key > > >>> .aggregate(1) // This will merge all the > > partial > > >>> results and get the final average. > > >>> > > >>> (This is only a draft, only trying to explain what it looks like. ) > > >>> > > >>> The local aggregate operator can be stateless, we can keep a memory > > >> buffer > > >>> or other efficient data structure to improve the aggregate > performance. > > >>> > > >>> Let me know if you have any other questions. > > >>> > > >>> Best, > > >>> Kurt > > >>> > > >>> > > >>> On Tue, Jun 18, 2019 at 1:29 PM vino yang <yanghua1...@gmail.com> > > wrote: > > >>> > > >>>> Hi Kurt, > > >>>> > > >>>> Thanks for your reply. > > >>>> > > >>>> Actually, I am not against you to raise your design. > > >>>> > > >>>> From your description before, I just can imagine your high-level > > >>>> implementation is about SQL and the optimization is inner of the > API. > > >> Is > > >>> it > > >>>> automatically? how to give the configuration option about trigger > > >>>> pre-aggregation? > > >>>> > > >>>> Maybe after I get more information, it sounds more reasonable. > > >>>> > > >>>> IMO, first of all, it would be better to make your user interface > > >>> concrete, > > >>>> it's the basis of the discussion. > > >>>> > > >>>> For example, can you give an example code snippet to introduce how > to > > >>> help > > >>>> users to process data skew caused by the jobs which built with > > >> DataStream > > >>>> API? > > >>>> > > >>>> If you give more details we can discuss further more. I think if one > > >>> design > > >>>> introduces an exact interface and another does not. > > >>>> > > >>>> The implementation has an obvious difference. For example, we > > introduce > > >>> an > > >>>> exact API in DataStream named localKeyBy, about the pre-aggregation > we > > >>> need > > >>>> to define the trigger mechanism of local aggregation, so we find > > reused > > >>>> window API and operator is a good choice. This is a reasoning link > > from > > >>>> design to implementation. > > >>>> > > >>>> What do you think? > > >>>> > > >>>> Best, > > >>>> Vino > > >>>> > > >>>> > > >>>> Kurt Young <ykt...@gmail.com> 于2019年6月18日周二 上午11:58写道: > > >>>> > > >>>>> Hi Vino, > > >>>>> > > >>>>> Now I feel that we may have different understandings about what > kind > > >> of > > >>>>> problems or improvements you want to > > >>>>> resolve. Currently, most of the feedback are focusing on *how to > do a > > >>>>> proper local aggregation to improve performance > > >>>>> and maybe solving the data skew issue*. And my gut feeling is this > is > > >>>>> exactly what users want at the first place, > > >>>>> especially those +1s. (Sorry to try to summarize here, please > correct > > >>> me > > >>>> if > > >>>>> i'm wrong). > > >>>>> > > >>>>> But I still think the design is somehow diverged from the goal. If > we > > >>>> want > > >>>>> to have an efficient and powerful way to > > >>>>> have local aggregation, supporting intermedia result type is > > >> essential > > >>>> IMO. > > >>>>> Both runtime's `AggregateFunction` and > > >>>>> SQL`s `UserDefinedAggregateFunction` have a proper support of > > >>>> intermediate > > >>>>> result type and can do `merge` operation > > >>>>> on them. > > >>>>> > > >>>>> Now, we have a lightweight alternatives which performs well, and > > >> have a > > >>>>> nice fit with the local aggregate requirements. > > >>>>> Mostly importantly, it's much less complex because it's stateless. > > >> And > > >>>> it > > >>>>> can also achieve the similar multiple-aggregation > > >>>>> scenario. > > >>>>> > > >>>>> I still not convinced why we shouldn't consider it as a first step. > > >>>>> > > >>>>> Best, > > >>>>> Kurt > > >>>>> > > >>>>> > > >>>>> On Tue, Jun 18, 2019 at 11:35 AM vino yang <yanghua1...@gmail.com> > > >>>> wrote: > > >>>>> > > >>>>>> Hi Kurt, > > >>>>>> > > >>>>>> Thanks for your comments. > > >>>>>> > > >>>>>> It seems we both implemented local aggregation feature to optimize > > >>> the > > >>>>>> issue of data skew. > > >>>>>> However, IMHO, the API level of optimizing revenue is different. > > >>>>>> > > >>>>>> *Your optimization benefits from Flink SQL and it's not user's > > >>>> faces.(If > > >>>>> I > > >>>>>> understand it incorrectly, please correct this.)* > > >>>>>> *Our implementation employs it as an optimization tool API for > > >>>>> DataStream, > > >>>>>> it just like a local version of the keyBy API.* > > >>>>>> > > >>>>>> Based on this, I want to say support it as a DataStream API can > > >>> provide > > >>>>>> these advantages: > > >>>>>> > > >>>>>> > > >>>>>> - The localKeyBy API has a clear semantic and it's flexible not > > >>> only > > >>>>> for > > >>>>>> processing data skew but also for implementing some user cases, > > >>> for > > >>>>>> example, if we want to calculate the multiple-level aggregation, > > >>> we > > >>>>> can > > >>>>>> do > > >>>>>> multiple-level aggregation in the local aggregation: > > >>>>>> input.localKeyBy("a").sum(1).localKeyBy("b").window(); // here > > >> "a" > > >>>> is > > >>>>> a > > >>>>>> sub-category, while "b" is a category, here we do not need to > > >>>> shuffle > > >>>>>> data > > >>>>>> in the network. > > >>>>>> - The users of DataStream API will benefit from this. Actually, > > >> we > > >>>>> have > > >>>>>> a lot of scenes need to use DataStream API. Currently, > > >> DataStream > > >>>> API > > >>>>> is > > >>>>>> the cornerstone of the physical plan of Flink SQL. With a > > >>> localKeyBy > > >>>>>> API, > > >>>>>> the optimization of SQL at least may use this optimized API, > > >> this > > >>>> is a > > >>>>>> further topic. > > >>>>>> - Based on the window operator, our state would benefit from > > >> Flink > > >>>>> State > > >>>>>> and checkpoint, we do not need to worry about OOM and job > > >> failed. > > >>>>>> > > >>>>>> Now, about your questions: > > >>>>>> > > >>>>>> 1. About our design cannot change the data type and about the > > >>>>>> implementation of average: > > >>>>>> > > >>>>>> Just like my reply to Hequn, the localKeyBy is an API provides to > > >> the > > >>>>> users > > >>>>>> who use DataStream API to build their jobs. > > >>>>>> Users should know its semantics and the difference with keyBy API, > > >> so > > >>>> if > > >>>>>> they want to the average aggregation, they should carry local sum > > >>>> result > > >>>>>> and local count result. > > >>>>>> I admit that it will be convenient to use keyBy directly. But we > > >> need > > >>>> to > > >>>>>> pay a little price when we get some benefits. I think this price > is > > >>>>>> reasonable. Considering that the DataStream API itself is a > > >> low-level > > >>>> API > > >>>>>> (at least for now). > > >>>>>> > > >>>>>> 2. About stateless operator and > > >>>>>> `StreamOperator::prepareSnapshotPreBarrier()`: > > >>>>>> > > >>>>>> Actually, I have discussed this opinion with @dianfu in the old > > >> mail > > >>>>>> thread. I will copy my opinion from there: > > >>>>>> > > >>>>>> - for your design, you still need somewhere to give the users > > >>>>> configure > > >>>>>> the trigger threshold (maybe memory availability?), this design > > >>>> cannot > > >>>>>> guarantee a deterministic semantics (it will bring trouble for > > >>>> testing > > >>>>>> and > > >>>>>> debugging). > > >>>>>> - if the implementation depends on the timing of checkpoint, it > > >>>> would > > >>>>>> affect the checkpoint's progress, and the buffered data may > > >> cause > > >>>> OOM > > >>>>>> issue. In addition, if the operator is stateless, it can not > > >>> provide > > >>>>>> fault > > >>>>>> tolerance. > > >>>>>> > > >>>>>> Best, > > >>>>>> Vino > > >>>>>> > > >>>>>> Kurt Young <ykt...@gmail.com> 于2019年6月18日周二 上午9:22写道: > > >>>>>> > > >>>>>>> Hi Vino, > > >>>>>>> > > >>>>>>> Thanks for the proposal, I like the general idea and IMO it's > > >> very > > >>>>> useful > > >>>>>>> feature. > > >>>>>>> But after reading through the document, I feel that we may over > > >>>> design > > >>>>>> the > > >>>>>>> required > > >>>>>>> operator for proper local aggregation. The main reason is we want > > >>> to > > >>>>>> have a > > >>>>>>> clear definition and behavior about the "local keyed state" which > > >>> in > > >>>> my > > >>>>>>> opinion is not > > >>>>>>> necessary for local aggregation, at least for start. > > >>>>>>> > > >>>>>>> Another issue I noticed is the local key by operator cannot > > >> change > > >>>>>> element > > >>>>>>> type, it will > > >>>>>>> also restrict a lot of use cases which can be benefit from local > > >>>>>>> aggregation, like "average". > > >>>>>>> > > >>>>>>> We also did similar logic in SQL and the only thing need to be > > >> done > > >>>> is > > >>>>>>> introduce > > >>>>>>> a stateless lightweight operator which is *chained* before > > >>> `keyby()`. > > >>>>> The > > >>>>>>> operator will flush all buffered > > >>>>>>> elements during `StreamOperator::prepareSnapshotPreBarrier()` and > > >>>> make > > >>>>>>> himself stateless. > > >>>>>>> By the way, in the earlier version we also did the similar > > >> approach > > >>>> by > > >>>>>>> introducing a stateful > > >>>>>>> local aggregation operator but it's not performed as well as the > > >>>> later > > >>>>>> one, > > >>>>>>> and also effect the barrie > > >>>>>>> alignment time. The later one is fairly simple and more > > >> efficient. > > >>>>>>> > > >>>>>>> I would highly suggest you to consider to have a stateless > > >> approach > > >>>> at > > >>>>>> the > > >>>>>>> first step. > > >>>>>>> > > >>>>>>> Best, > > >>>>>>> Kurt > > >>>>>>> > > >>>>>>> > > >>>>>>> On Mon, Jun 17, 2019 at 7:32 PM Jark Wu <imj...@gmail.com> > > >> wrote: > > >>>>>>> > > >>>>>>>> Hi Vino, > > >>>>>>>> > > >>>>>>>> Thanks for the proposal. > > >>>>>>>> > > >>>>>>>> Regarding to the "input.keyBy(0).sum(1)" vs > > >>>>>>>> "input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)", > > >> have > > >>>> you > > >>>>>>> done > > >>>>>>>> some benchmark? > > >>>>>>>> Because I'm curious about how much performance improvement can > > >> we > > >>>> get > > >>>>>> by > > >>>>>>>> using count window as the local operator. > > >>>>>>>> > > >>>>>>>> Best, > > >>>>>>>> Jark > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> On Mon, 17 Jun 2019 at 17:48, vino yang <yanghua1...@gmail.com > > >>> > > >>>>> wrote: > > >>>>>>>> > > >>>>>>>>> Hi Hequn, > > >>>>>>>>> > > >>>>>>>>> Thanks for your reply. > > >>>>>>>>> > > >>>>>>>>> The purpose of localKeyBy API is to provide a tool which can > > >>> let > > >>>>>> users > > >>>>>>> do > > >>>>>>>>> pre-aggregation in the local. The behavior of the > > >>> pre-aggregation > > >>>>> is > > >>>>>>>>> similar to keyBy API. > > >>>>>>>>> > > >>>>>>>>> So the three cases are different, I will describe them one by > > >>>> one: > > >>>>>>>>> > > >>>>>>>>> 1. input.keyBy(0).sum(1) > > >>>>>>>>> > > >>>>>>>>> *In this case, the result is event-driven, each event can > > >>> produce > > >>>>> one > > >>>>>>> sum > > >>>>>>>>> aggregation result and it is the latest one from the source > > >>>> start.* > > >>>>>>>>> > > >>>>>>>>> 2. input.localKeyBy(0).sum(1).keyBy(0).sum(1) > > >>>>>>>>> > > >>>>>>>>> *In this case, the semantic may have a problem, it would do > > >> the > > >>>>> local > > >>>>>>> sum > > >>>>>>>>> aggregation and will produce the latest partial result from > > >> the > > >>>>>> source > > >>>>>>>>> start for every event. * > > >>>>>>>>> *These latest partial results from the same key are hashed to > > >>> one > > >>>>>> node > > >>>>>>> to > > >>>>>>>>> do the global sum aggregation.* > > >>>>>>>>> *In the global aggregation, when it received multiple partial > > >>>>> results > > >>>>>>>> (they > > >>>>>>>>> are all calculated from the source start) and sum them will > > >> get > > >>>> the > > >>>>>>> wrong > > >>>>>>>>> result.* > > >>>>>>>>> > > >>>>>>>>> 3. input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1) > > >>>>>>>>> > > >>>>>>>>> *In this case, it would just get a partial aggregation result > > >>> for > > >>>>>> the 5 > > >>>>>>>>> records in the count window. The partial aggregation results > > >>> from > > >>>>> the > > >>>>>>>> same > > >>>>>>>>> key will be aggregated globally.* > > >>>>>>>>> > > >>>>>>>>> So the first case and the third case can get the *same* > > >> result, > > >>>> the > > >>>>>>>>> difference is the output-style and the latency. > > >>>>>>>>> > > >>>>>>>>> Generally speaking, the local key API is just an optimization > > >>>> API. > > >>>>> We > > >>>>>>> do > > >>>>>>>>> not limit the user's usage, but the user has to understand > > >> its > > >>>>>>> semantics > > >>>>>>>>> and use it correctly. > > >>>>>>>>> > > >>>>>>>>> Best, > > >>>>>>>>> Vino > > >>>>>>>>> > > >>>>>>>>> Hequn Cheng <chenghe...@gmail.com> 于2019年6月17日周一 下午4:18写道: > > >>>>>>>>> > > >>>>>>>>>> Hi Vino, > > >>>>>>>>>> > > >>>>>>>>>> Thanks for the proposal, I think it is a very good feature! > > >>>>>>>>>> > > >>>>>>>>>> One thing I want to make sure is the semantics for the > > >>>>>> `localKeyBy`. > > >>>>>>>> From > > >>>>>>>>>> the document, the `localKeyBy` API returns an instance of > > >>>>>>> `KeyedStream` > > >>>>>>>>>> which can also perform sum(), so in this case, what's the > > >>>>> semantics > > >>>>>>> for > > >>>>>>>>>> `localKeyBy()`. For example, will the following code share > > >>> the > > >>>>> same > > >>>>>>>>> result? > > >>>>>>>>>> and what're the differences between them? > > >>>>>>>>>> > > >>>>>>>>>> 1. input.keyBy(0).sum(1) > > >>>>>>>>>> 2. input.localKeyBy(0).sum(1).keyBy(0).sum(1) > > >>>>>>>>>> 3. > > >> input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1) > > >>>>>>>>>> > > >>>>>>>>>> Would also be great if we can add this into the document. > > >>> Thank > > >>>>> you > > >>>>>>>> very > > >>>>>>>>>> much. > > >>>>>>>>>> > > >>>>>>>>>> Best, Hequn > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> On Fri, Jun 14, 2019 at 11:34 AM vino yang < > > >>>>> yanghua1...@gmail.com> > > >>>>>>>>> wrote: > > >>>>>>>>>> > > >>>>>>>>>>> Hi Aljoscha, > > >>>>>>>>>>> > > >>>>>>>>>>> I have looked at the "*Process*" section of FLIP wiki > > >>>> page.[1] > > >>>>>> This > > >>>>>>>>> mail > > >>>>>>>>>>> thread indicates that it has proceeded to the third step. > > >>>>>>>>>>> > > >>>>>>>>>>> When I looked at the fourth step(vote step), I didn't > > >> find > > >>>> the > > >>>>>>>>>>> prerequisites for starting the voting process. > > >>>>>>>>>>> > > >>>>>>>>>>> Considering that the discussion of this feature has been > > >>> done > > >>>>> in > > >>>>>>> the > > >>>>>>>>> old > > >>>>>>>>>>> thread. [2] So can you tell me when should I start > > >> voting? > > >>>> Can > > >>>>> I > > >>>>>>>> start > > >>>>>>>>>> now? > > >>>>>>>>>>> > > >>>>>>>>>>> Best, > > >>>>>>>>>>> Vino > > >>>>>>>>>>> > > >>>>>>>>>>> [1]: > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>> > > >>>> > > >>> > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals#FlinkImprovementProposals-FLIPround-up > > >>>>>>>>>>> [2]: > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>> > > >>>> > > >>> > > >> > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308 > > >>>>>>>>>>> > > >>>>>>>>>>> leesf <leesf0...@gmail.com> 于2019年6月13日周四 上午9:19写道: > > >>>>>>>>>>> > > >>>>>>>>>>>> +1 for the FLIP, thank vino for your efforts. > > >>>>>>>>>>>> > > >>>>>>>>>>>> Best, > > >>>>>>>>>>>> Leesf > > >>>>>>>>>>>> > > >>>>>>>>>>>> vino yang <yanghua1...@gmail.com> 于2019年6月12日周三 > > >>> 下午5:46写道: > > >>>>>>>>>>>> > > >>>>>>>>>>>>> Hi folks, > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> I would like to start the FLIP discussion thread > > >> about > > >>>>>>> supporting > > >>>>>>>>>> local > > >>>>>>>>>>>>> aggregation in Flink. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> In short, this feature can effectively alleviate data > > >>>> skew. > > >>>>>>> This > > >>>>>>>> is > > >>>>>>>>>> the > > >>>>>>>>>>>>> FLIP: > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>> > > >>>> > > >>> > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-44%3A+Support+Local+Aggregation+in+Flink > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> *Motivation* (copied from FLIP) > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Currently, keyed streams are widely used to perform > > >>>>>> aggregating > > >>>>>>>>>>>> operations > > >>>>>>>>>>>>> (e.g., reduce, sum and window) on the elements that > > >>> have > > >>>>> the > > >>>>>>> same > > >>>>>>>>>> key. > > >>>>>>>>>>>> When > > >>>>>>>>>>>>> executed at runtime, the elements with the same key > > >>> will > > >>>> be > > >>>>>>> sent > > >>>>>>>> to > > >>>>>>>>>> and > > >>>>>>>>>>>>> aggregated by the same task. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> The performance of these aggregating operations is > > >> very > > >>>>>>> sensitive > > >>>>>>>>> to > > >>>>>>>>>>> the > > >>>>>>>>>>>>> distribution of keys. In the cases where the > > >>> distribution > > >>>>> of > > >>>>>>> keys > > >>>>>>>>>>>> follows a > > >>>>>>>>>>>>> powerful law, the performance will be significantly > > >>>>>> downgraded. > > >>>>>>>>> More > > >>>>>>>>>>>>> unluckily, increasing the degree of parallelism does > > >>> not > > >>>>> help > > >>>>>>>> when > > >>>>>>>>> a > > >>>>>>>>>>> task > > >>>>>>>>>>>>> is overloaded by a single key. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Local aggregation is a widely-adopted method to > > >> reduce > > >>>> the > > >>>>>>>>>> performance > > >>>>>>>>>>>>> degraded by data skew. We can decompose the > > >> aggregating > > >>>>>>>> operations > > >>>>>>>>>> into > > >>>>>>>>>>>> two > > >>>>>>>>>>>>> phases. In the first phase, we aggregate the elements > > >>> of > > >>>>> the > > >>>>>>> same > > >>>>>>>>> key > > >>>>>>>>>>> at > > >>>>>>>>>>>>> the sender side to obtain partial results. Then at > > >> the > > >>>>> second > > >>>>>>>>> phase, > > >>>>>>>>>>>> these > > >>>>>>>>>>>>> partial results are sent to receivers according to > > >>> their > > >>>>> keys > > >>>>>>> and > > >>>>>>>>> are > > >>>>>>>>>>>>> combined to obtain the final result. Since the number > > >>> of > > >>>>>>> partial > > >>>>>>>>>>> results > > >>>>>>>>>>>>> received by each receiver is limited by the number of > > >>>>>> senders, > > >>>>>>>> the > > >>>>>>>>>>>>> imbalance among receivers can be reduced. Besides, by > > >>>>>> reducing > > >>>>>>>> the > > >>>>>>>>>>> amount > > >>>>>>>>>>>>> of transferred data the performance can be further > > >>>>> improved. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> *More details*: > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Design documentation: > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>> > > >>>> > > >>> > > >> > > > https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Old discussion thread: > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>> > > >>>> > > >>> > > >> > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308 > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> JIRA: FLINK-12786 < > > >>>>>>>>> https://issues.apache.org/jira/browse/FLINK-12786 > > >>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> We are looking forwards to your feedback! > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Best, > > >>>>>>>>>>>>> Vino > > >>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>> > > >>>> > > >>> > > >> > > > > >