Hi vino, Sorry I don't see the consensus about reusing window operator and keep the API design of localKeyBy. But I think we should definitely more thoughts about this topic.
I also try to loop in Stephan for this discussion. Best, Kurt On Mon, Jun 24, 2019 at 3:26 PM vino yang <yanghua1...@gmail.com> wrote: > Hi all, > > I am happy we have a wonderful discussion and received many valuable > opinions in the last few days. > > Now, let me try to summarize what we have reached consensus about the > changes in the design. > > - provide a unified abstraction to support two kinds of implementation; > - reuse WindowOperator and try to enhance it so that we can make the > intermediate result of the local aggregation can be buffered and > flushed to > support two kinds of implementation; > - keep the API design of localKeyBy, but declare the disabled some APIs > we cannot support currently, and provide a configurable API for users to > choose how to handle intermediate result; > > The above three points have been updated in the design doc. Any > questions, please let me know. > > @Aljoscha Krettek <aljos...@apache.org> What do you think? Any further > comments? > > Best, > Vino > > vino yang <yanghua1...@gmail.com> 于2019年6月20日周四 下午2:02写道: > > > Hi Kurt, > > > > Thanks for your comments. > > > > It seems we come to a consensus that we should alleviate the performance > > degraded by data skew with local aggregation. In this FLIP, our key > > solution is to introduce local keyed partition to achieve this goal. > > > > I also agree that we can benefit a lot from the usage of > > AggregateFunction. In combination with localKeyBy, We can easily use it > to > > achieve local aggregation: > > > > - input.localKeyBy(0).aggregate() > > - input.localKeyBy(0).window().aggregate() > > > > > > I think the only problem here is the choices between > > > > - (1) Introducing a new primitive called localKeyBy and implement > > local aggregation with existing operators, or > > - (2) Introducing an operator called localAggregation which is > > composed of a key selector, a window-like operator, and an aggregate > > function. > > > > > > There may exist some optimization opportunities by providing a composited > > interface for local aggregation. But at the same time, in my opinion, we > > lose flexibility (Or we need certain efforts to achieve the same > > flexibility). > > > > As said in the previous mails, we have many use cases where the > > aggregation is very complicated and cannot be performed with > > AggregateFunction. For example, users may perform windowed aggregations > > according to time, data values, or even external storage. Typically, they > > now use KeyedProcessFunction or customized triggers to implement these > > aggregations. It's not easy to address data skew in such cases with a > > composited interface for local aggregation. > > > > Given that Data Stream API is exactly targeted at these cases where the > > application logic is very complicated and optimization does not matter, I > > think it's a better choice to provide a relatively low-level and > canonical > > interface. > > > > The composited interface, on the other side, may be a good choice in > > declarative interfaces, including SQL and Table API, as it allows more > > optimization opportunities. > > > > Best, > > Vino > > > > > > Kurt Young <ykt...@gmail.com> 于2019年6月20日周四 上午10:15写道: > > > >> Hi all, > >> > >> As vino said in previous emails, I think we should first discuss and > >> decide > >> what kind of use cases this FLIP want to > >> resolve, and what the API should look like. From my side, I think this > is > >> probably the root cause of current divergence. > >> > >> My understand is (from the FLIP title and motivation section of the > >> document), we want to have a proper support of > >> local aggregation, or pre aggregation. This is not a very new idea, most > >> SQL engine already did this improvement. And > >> the core concept about this is, there should be an AggregateFunction, no > >> matter it's a Flink runtime's AggregateFunction or > >> SQL's UserDefinedAggregateFunction. Both aggregation have concept of > >> intermediate data type, sometimes we call it ACC. > >> I quickly went through the POC piotr did before [1], it also directly > uses > >> AggregateFunction. > >> > >> But the thing is, after reading the design of this FLIP, I can't help > >> myself feeling that this FLIP is not targeting to have a proper > >> local aggregation support. It actually want to introduce another > concept: > >> LocalKeyBy, and how to split and merge local key groups, > >> and how to properly support state on local key. Local aggregation just > >> happened to be one possible use case of LocalKeyBy. > >> But it lacks supporting the essential concept of local aggregation, > which > >> is intermediate data type. Without this, I really don't thing > >> it is a good fit of local aggregation. > >> > >> Here I want to make sure of the scope or the goal about this FLIP, do we > >> want to have a proper local aggregation engine, or we > >> just want to introduce a new concept called LocalKeyBy? > >> > >> [1]: https://github.com/apache/flink/pull/4626 > >> > >> Best, > >> Kurt > >> > >> > >> On Wed, Jun 19, 2019 at 5:13 PM vino yang <yanghua1...@gmail.com> > wrote: > >> > >> > Hi Hequn, > >> > > >> > Thanks for your comments! > >> > > >> > I agree that allowing local aggregation reusing window API and > refining > >> > window operator to make it match both requirements (come from our and > >> Kurt) > >> > is a good decision! > >> > > >> > Concerning your questions: > >> > > >> > 1. The result of input.localKeyBy(0).sum(1).keyBy(0).sum(1) may be > >> > meaningless. > >> > > >> > Yes, it does not make sense in most cases. However, I also want to > note > >> > users should know the right semantics of localKeyBy and use it > >> correctly. > >> > Because this issue also exists for the global keyBy, consider this > >> example: > >> > input.keyBy(0).sum(1).keyBy(0).sum(1), the result is also meaningless. > >> > > >> > 2. About the semantics of > >> > input1.localKeyBy(0).intervalJoin(input2.localKeyBy(1)). > >> > > >> > Good catch! I agree with you that it's not good to enable all > >> > functionalities for localKeyBy from KeyedStream. > >> > Currently, We do not support some APIs such as > >> > connect/join/intervalJoin/coGroup. This is due to that we force the > >> > operators on LocalKeyedStreams chained with the inputs. > >> > > >> > Best, > >> > Vino > >> > > >> > > >> > Hequn Cheng <chenghe...@gmail.com> 于2019年6月19日周三 下午3:42写道: > >> > > >> > > Hi, > >> > > > >> > > Thanks a lot for your great discussion and great to see that some > >> > agreement > >> > > has been reached on the "local aggregate engine"! > >> > > > >> > > ===> Considering the abstract engine, > >> > > I'm thinking is it valuable for us to extend the current window to > >> meet > >> > > both demands raised by Kurt and Vino? There are some benefits we can > >> get: > >> > > > >> > > 1. The interfaces of the window are complete and clear. With > windows, > >> we > >> > > can define a lot of ways to split the data and perform different > >> > > computations. > >> > > 2. We can also leverage the window to do miniBatch for the global > >> > > aggregation, i.e, we can use the window to bundle data belong to the > >> same > >> > > key, for every bundle we only need to read and write once state. > This > >> can > >> > > greatly reduce state IO and improve performance. > >> > > 3. A lot of other use cases can also benefit from the window base on > >> > memory > >> > > or stateless. > >> > > > >> > > ===> As for the API, > >> > > I think it is good to make our API more flexible. However, we may > >> need to > >> > > make our API meaningful. > >> > > > >> > > Take my previous reply as an example, > >> > > input.localKeyBy(0).sum(1).keyBy(0).sum(1). The result may be > >> > meaningless. > >> > > Another example I find is the intervalJoin, e.g., > >> > > input1.localKeyBy(0).intervalJoin(input2.localKeyBy(1)). In this > >> case, it > >> > > will bring problems if input1 and input2 share different > parallelism. > >> We > >> > > don't know which input should the join chained with? Even if they > >> share > >> > the > >> > > same parallelism, it's hard to tell what the join is doing. There > are > >> > maybe > >> > > some other problems. > >> > > > >> > > From this point of view, it's at least not good to enable all > >> > > functionalities for localKeyBy from KeyedStream? > >> > > > >> > > Great to also have your opinions. > >> > > > >> > > Best, Hequn > >> > > > >> > > > >> > > > >> > > > >> > > On Wed, Jun 19, 2019 at 10:24 AM vino yang <yanghua1...@gmail.com> > >> > wrote: > >> > > > >> > > > Hi Kurt and Piotrek, > >> > > > > >> > > > Thanks for your comments. > >> > > > > >> > > > I agree that we can provide a better abstraction to be compatible > >> with > >> > > two > >> > > > different implementations. > >> > > > > >> > > > First of all, I think we should consider what kind of scenarios we > >> need > >> > > to > >> > > > support in *API* level? > >> > > > > >> > > > We have some use cases which need to a customized aggregation > >> through > >> > > > KeyedProcessFunction, (in the usage of our localKeyBy.window they > >> can > >> > use > >> > > > ProcessWindowFunction). > >> > > > > >> > > > Shall we support these flexible use scenarios? > >> > > > > >> > > > Best, > >> > > > Vino > >> > > > > >> > > > Kurt Young <ykt...@gmail.com> 于2019年6月18日周二 下午8:37写道: > >> > > > > >> > > > > Hi Piotr, > >> > > > > > >> > > > > Thanks for joining the discussion. Make “local aggregation" > >> abstract > >> > > > enough > >> > > > > sounds good to me, we could > >> > > > > implement and verify alternative solutions for use cases of > local > >> > > > > aggregation. Maybe we will find both solutions > >> > > > > are appropriate for different scenarios. > >> > > > > > >> > > > > Starting from a simple one sounds a practical way to go. What do > >> you > >> > > > think, > >> > > > > vino? > >> > > > > > >> > > > > Best, > >> > > > > Kurt > >> > > > > > >> > > > > > >> > > > > On Tue, Jun 18, 2019 at 8:10 PM Piotr Nowojski < > >> pi...@ververica.com> > >> > > > > wrote: > >> > > > > > >> > > > > > Hi Kurt and Vino, > >> > > > > > > >> > > > > > I think there is a trade of hat we need to consider for the > >> local > >> > > > > > aggregation. > >> > > > > > > >> > > > > > Generally speaking I would agree with Kurt about local > >> > > aggregation/pre > >> > > > > > aggregation not using Flink's state flush the operator on a > >> > > checkpoint. > >> > > > > > Network IO is usually cheaper compared to Disks IO. This has > >> > however > >> > > > > couple > >> > > > > > of issues: > >> > > > > > 1. It can explode number of in-flight records during > checkpoint > >> > > barrier > >> > > > > > alignment, making checkpointing slower and decrease the actual > >> > > > > throughput. > >> > > > > > 2. This trades Disks IO on the local aggregation machine with > >> CPU > >> > > (and > >> > > > > > Disks IO in case of RocksDB) on the final aggregation machine. > >> This > >> > > is > >> > > > > > fine, as long there is no huge data skew. If there is only a > >> > handful > >> > > > (or > >> > > > > > even one single) hot keys, it might be better to keep the > >> > persistent > >> > > > > state > >> > > > > > in the LocalAggregationOperator to offload final aggregation > as > >> > much > >> > > as > >> > > > > > possible. > >> > > > > > 3. With frequent checkpointing local aggregation effectiveness > >> > would > >> > > > > > degrade. > >> > > > > > > >> > > > > > I assume Kurt is correct, that in your use cases stateless > >> operator > >> > > was > >> > > > > > behaving better, but I could easily see other use cases as > well. > >> > For > >> > > > > > example someone is already using RocksDB, and his job is > >> > bottlenecked > >> > > > on > >> > > > > a > >> > > > > > single window operator instance because of the data skew. In > >> that > >> > > case > >> > > > > > stateful local aggregation would be probably a better choice. > >> > > > > > > >> > > > > > Because of that, I think we should eventually provide both > >> versions > >> > > and > >> > > > > in > >> > > > > > the initial version we should at least make the “local > >> aggregation > >> > > > > engine” > >> > > > > > abstract enough, that one could easily provide different > >> > > implementation > >> > > > > > strategy. > >> > > > > > > >> > > > > > Piotrek > >> > > > > > > >> > > > > > > On 18 Jun 2019, at 11:46, Kurt Young <ykt...@gmail.com> > >> wrote: > >> > > > > > > > >> > > > > > > Hi, > >> > > > > > > > >> > > > > > > For the trigger, it depends on what operator we want to use > >> under > >> > > the > >> > > > > > API. > >> > > > > > > If we choose to use window operator, > >> > > > > > > we should also use window's trigger. However, I also think > >> reuse > >> > > > window > >> > > > > > > operator for this scenario may not be > >> > > > > > > the best choice. The reasons are the following: > >> > > > > > > > >> > > > > > > 1. As a lot of people already pointed out, window relies > >> heavily > >> > on > >> > > > > state > >> > > > > > > and it will definitely effect performance. You can > >> > > > > > > argue that one can use heap based statebackend, but this > will > >> > > > introduce > >> > > > > > > extra coupling. Especially we have a chance to > >> > > > > > > design a pure stateless operator. > >> > > > > > > 2. The window operator is *the most* complicated operator > >> Flink > >> > > > > currently > >> > > > > > > have. Maybe we only need to pick a subset of > >> > > > > > > window operator to achieve the goal, but once the user wants > >> to > >> > > have > >> > > > a > >> > > > > > deep > >> > > > > > > look at the localAggregation operator, it's still > >> > > > > > > hard to find out what's going on under the window operator. > >> For > >> > > > > > simplicity, > >> > > > > > > I would also recommend we introduce a dedicated > >> > > > > > > lightweight operator, which also much easier for a user to > >> learn > >> > > and > >> > > > > use. > >> > > > > > > > >> > > > > > > For your question about increasing the burden in > >> > > > > > > `StreamOperator::prepareSnapshotPreBarrier()`, the only > thing > >> > this > >> > > > > > function > >> > > > > > > need > >> > > > > > > to do is output all the partial results, it's purely cpu > >> > workload, > >> > > > not > >> > > > > > > introducing any IO. I want to point out that even if we have > >> this > >> > > > > > > cost, we reduced another barrier align cost of the operator, > >> > which > >> > > is > >> > > > > the > >> > > > > > > sync flush stage of the state, if you introduced state. This > >> > > > > > > flush actually will introduce disk IO, and I think it's > >> worthy to > >> > > > > > exchange > >> > > > > > > this cost with purely CPU workload. And we do have some > >> > > > > > > observations about these two behavior (as i said before, we > >> > > actually > >> > > > > > > implemented both solutions), the stateless one actually > >> performs > >> > > > > > > better both in performance and barrier align time. > >> > > > > > > > >> > > > > > > Best, > >> > > > > > > Kurt > >> > > > > > > > >> > > > > > > > >> > > > > > > On Tue, Jun 18, 2019 at 3:40 PM vino yang < > >> yanghua1...@gmail.com > >> > > > >> > > > > wrote: > >> > > > > > > > >> > > > > > >> Hi Kurt, > >> > > > > > >> > >> > > > > > >> Thanks for your example. Now, it looks more clearly for me. > >> > > > > > >> > >> > > > > > >> From your example code snippet, I saw the localAggregate > API > >> has > >> > > > three > >> > > > > > >> parameters: > >> > > > > > >> > >> > > > > > >> 1. key field > >> > > > > > >> 2. PartitionAvg > >> > > > > > >> 3. CountTrigger: Does this trigger comes from window > >> package? > >> > > > > > >> > >> > > > > > >> I will compare our and your design from API and operator > >> level: > >> > > > > > >> > >> > > > > > >> *From the API level:* > >> > > > > > >> > >> > > > > > >> As I replied to @dianfu in the old email thread,[1] the > >> Window > >> > API > >> > > > can > >> > > > > > >> provide the second and the third parameter right now. > >> > > > > > >> > >> > > > > > >> If you reuse specified interface or class, such as > *Trigger* > >> or > >> > > > > > >> *CounterTrigger* provided by window package, but do not use > >> > window > >> > > > > API, > >> > > > > > >> it's not reasonable. > >> > > > > > >> And if you do not reuse these interface or class, you would > >> need > >> > > to > >> > > > > > >> introduce more things however they are looked similar to > the > >> > > things > >> > > > > > >> provided by window package. > >> > > > > > >> > >> > > > > > >> The window package has provided several types of the window > >> and > >> > > many > >> > > > > > >> triggers and let users customize it. What's more, the user > is > >> > more > >> > > > > > familiar > >> > > > > > >> with Window API. > >> > > > > > >> > >> > > > > > >> This is the reason why we just provide localKeyBy API and > >> reuse > >> > > the > >> > > > > > window > >> > > > > > >> API. It reduces unnecessary components such as triggers and > >> the > >> > > > > > mechanism > >> > > > > > >> of buffer (based on count num or time). > >> > > > > > >> And it has a clear and easy to understand semantics. > >> > > > > > >> > >> > > > > > >> *From the operator level:* > >> > > > > > >> > >> > > > > > >> We reused window operator, so we can get all the benefits > >> from > >> > > state > >> > > > > and > >> > > > > > >> checkpoint. > >> > > > > > >> > >> > > > > > >> From your design, you named the operator under > localAggregate > >> > API > >> > > > is a > >> > > > > > >> *stateless* operator. IMO, it is still a state, it is just > >> not > >> > > Flink > >> > > > > > >> managed state. > >> > > > > > >> About the memory buffer (I think it's still not very clear, > >> if > >> > you > >> > > > > have > >> > > > > > >> time, can you give more detail information or answer my > >> > > questions), > >> > > > I > >> > > > > > have > >> > > > > > >> some questions: > >> > > > > > >> > >> > > > > > >> - if it just a raw JVM heap memory buffer, how to support > >> > fault > >> > > > > > >> tolerance, if the job is configured EXACTLY-ONCE semantic > >> > > > guarantee? > >> > > > > > >> - if you thought the memory buffer(non-Flink state), has > >> > better > >> > > > > > >> performance. In our design, users can also config HEAP > >> state > >> > > > backend > >> > > > > > to > >> > > > > > >> provide the performance close to your mechanism. > >> > > > > > >> - `StreamOperator::prepareSnapshotPreBarrier()` related > to > >> the > >> > > > > timing > >> > > > > > of > >> > > > > > >> snapshot. IMO, the flush action should be a synchronized > >> > action? > >> > > > (if > >> > > > > > >> not, > >> > > > > > >> please point out my mistake) I still think we should not > >> > depend > >> > > on > >> > > > > the > >> > > > > > >> timing of checkpoint. Checkpoint related operations are > >> > inherent > >> > > > > > >> performance sensitive, we should not increase its burden > >> > > anymore. > >> > > > > Our > >> > > > > > >> implementation based on the mechanism of Flink's > >> checkpoint, > >> > > which > >> > > > > can > >> > > > > > >> benefit from the asnyc snapshot and incremental > checkpoint. > >> > IMO, > >> > > > the > >> > > > > > >> performance is not a problem, and we also do not find the > >> > > > > performance > >> > > > > > >> issue > >> > > > > > >> in our production. > >> > > > > > >> > >> > > > > > >> [1]: > >> > > > > > >> > >> > > > > > >> > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308 > >> > > > > > >> > >> > > > > > >> Best, > >> > > > > > >> Vino > >> > > > > > >> > >> > > > > > >> Kurt Young <ykt...@gmail.com> 于2019年6月18日周二 下午2:27写道: > >> > > > > > >> > >> > > > > > >>> Yeah, sorry for not expressing myself clearly. I will try > to > >> > > > provide > >> > > > > > more > >> > > > > > >>> details to make sure we are on the same page. > >> > > > > > >>> > >> > > > > > >>> For DataStream API, it shouldn't be optimized > automatically. > >> > You > >> > > > have > >> > > > > > to > >> > > > > > >>> explicitly call API to do local aggregation > >> > > > > > >>> as well as the trigger policy of the local aggregation. > Take > >> > > > average > >> > > > > > for > >> > > > > > >>> example, the user program may look like this (just a > draft): > >> > > > > > >>> > >> > > > > > >>> assuming the input type is DataStream<Tupl2<String, Int>> > >> > > > > > >>> > >> > > > > > >>> ds.localAggregate( > >> > > > > > >>> 0, // The > local > >> > key, > >> > > > > which > >> > > > > > >> is > >> > > > > > >>> the String from Tuple2 > >> > > > > > >>> PartitionAvg(1), // The partial > >> > > aggregation > >> > > > > > >>> function, produces Tuple2<Long, Int>, indicating sum and > >> count > >> > > > > > >>> CountTrigger.of(1000L) // Trigger policy, note > >> this > >> > > > should > >> > > > > be > >> > > > > > >>> best effort, and also be composited with time based or > >> memory > >> > > size > >> > > > > > based > >> > > > > > >>> trigger > >> > > > > > >>> ) // The > return > >> > type > >> > > > is > >> > > > > > >> local > >> > > > > > >>> aggregate Tuple2<String, Tupl2<Long, Int>> > >> > > > > > >>> .keyBy(0) // Further keyby > it > >> > with > >> > > > > > >> required > >> > > > > > >>> key > >> > > > > > >>> .aggregate(1) // This will merge > all > >> > the > >> > > > > > partial > >> > > > > > >>> results and get the final average. > >> > > > > > >>> > >> > > > > > >>> (This is only a draft, only trying to explain what it > looks > >> > > like. ) > >> > > > > > >>> > >> > > > > > >>> The local aggregate operator can be stateless, we can > keep a > >> > > memory > >> > > > > > >> buffer > >> > > > > > >>> or other efficient data structure to improve the aggregate > >> > > > > performance. > >> > > > > > >>> > >> > > > > > >>> Let me know if you have any other questions. > >> > > > > > >>> > >> > > > > > >>> Best, > >> > > > > > >>> Kurt > >> > > > > > >>> > >> > > > > > >>> > >> > > > > > >>> On Tue, Jun 18, 2019 at 1:29 PM vino yang < > >> > yanghua1...@gmail.com > >> > > > > >> > > > > > wrote: > >> > > > > > >>> > >> > > > > > >>>> Hi Kurt, > >> > > > > > >>>> > >> > > > > > >>>> Thanks for your reply. > >> > > > > > >>>> > >> > > > > > >>>> Actually, I am not against you to raise your design. > >> > > > > > >>>> > >> > > > > > >>>> From your description before, I just can imagine your > >> > high-level > >> > > > > > >>>> implementation is about SQL and the optimization is inner > >> of > >> > the > >> > > > > API. > >> > > > > > >> Is > >> > > > > > >>> it > >> > > > > > >>>> automatically? how to give the configuration option about > >> > > trigger > >> > > > > > >>>> pre-aggregation? > >> > > > > > >>>> > >> > > > > > >>>> Maybe after I get more information, it sounds more > >> reasonable. > >> > > > > > >>>> > >> > > > > > >>>> IMO, first of all, it would be better to make your user > >> > > interface > >> > > > > > >>> concrete, > >> > > > > > >>>> it's the basis of the discussion. > >> > > > > > >>>> > >> > > > > > >>>> For example, can you give an example code snippet to > >> introduce > >> > > how > >> > > > > to > >> > > > > > >>> help > >> > > > > > >>>> users to process data skew caused by the jobs which built > >> with > >> > > > > > >> DataStream > >> > > > > > >>>> API? > >> > > > > > >>>> > >> > > > > > >>>> If you give more details we can discuss further more. I > >> think > >> > if > >> > > > one > >> > > > > > >>> design > >> > > > > > >>>> introduces an exact interface and another does not. > >> > > > > > >>>> > >> > > > > > >>>> The implementation has an obvious difference. For > example, > >> we > >> > > > > > introduce > >> > > > > > >>> an > >> > > > > > >>>> exact API in DataStream named localKeyBy, about the > >> > > > pre-aggregation > >> > > > > we > >> > > > > > >>> need > >> > > > > > >>>> to define the trigger mechanism of local aggregation, so > we > >> > find > >> > > > > > reused > >> > > > > > >>>> window API and operator is a good choice. This is a > >> reasoning > >> > > link > >> > > > > > from > >> > > > > > >>>> design to implementation. > >> > > > > > >>>> > >> > > > > > >>>> What do you think? > >> > > > > > >>>> > >> > > > > > >>>> Best, > >> > > > > > >>>> Vino > >> > > > > > >>>> > >> > > > > > >>>> > >> > > > > > >>>> Kurt Young <ykt...@gmail.com> 于2019年6月18日周二 上午11:58写道: > >> > > > > > >>>> > >> > > > > > >>>>> Hi Vino, > >> > > > > > >>>>> > >> > > > > > >>>>> Now I feel that we may have different understandings > about > >> > what > >> > > > > kind > >> > > > > > >> of > >> > > > > > >>>>> problems or improvements you want to > >> > > > > > >>>>> resolve. Currently, most of the feedback are focusing on > >> *how > >> > > to > >> > > > > do a > >> > > > > > >>>>> proper local aggregation to improve performance > >> > > > > > >>>>> and maybe solving the data skew issue*. And my gut > >> feeling is > >> > > > this > >> > > > > is > >> > > > > > >>>>> exactly what users want at the first place, > >> > > > > > >>>>> especially those +1s. (Sorry to try to summarize here, > >> please > >> > > > > correct > >> > > > > > >>> me > >> > > > > > >>>> if > >> > > > > > >>>>> i'm wrong). > >> > > > > > >>>>> > >> > > > > > >>>>> But I still think the design is somehow diverged from > the > >> > goal. > >> > > > If > >> > > > > we > >> > > > > > >>>> want > >> > > > > > >>>>> to have an efficient and powerful way to > >> > > > > > >>>>> have local aggregation, supporting intermedia result > type > >> is > >> > > > > > >> essential > >> > > > > > >>>> IMO. > >> > > > > > >>>>> Both runtime's `AggregateFunction` and > >> > > > > > >>>>> SQL`s `UserDefinedAggregateFunction` have a proper > >> support of > >> > > > > > >>>> intermediate > >> > > > > > >>>>> result type and can do `merge` operation > >> > > > > > >>>>> on them. > >> > > > > > >>>>> > >> > > > > > >>>>> Now, we have a lightweight alternatives which performs > >> well, > >> > > and > >> > > > > > >> have a > >> > > > > > >>>>> nice fit with the local aggregate requirements. > >> > > > > > >>>>> Mostly importantly, it's much less complex because it's > >> > > > stateless. > >> > > > > > >> And > >> > > > > > >>>> it > >> > > > > > >>>>> can also achieve the similar multiple-aggregation > >> > > > > > >>>>> scenario. > >> > > > > > >>>>> > >> > > > > > >>>>> I still not convinced why we shouldn't consider it as a > >> first > >> > > > step. > >> > > > > > >>>>> > >> > > > > > >>>>> Best, > >> > > > > > >>>>> Kurt > >> > > > > > >>>>> > >> > > > > > >>>>> > >> > > > > > >>>>> On Tue, Jun 18, 2019 at 11:35 AM vino yang < > >> > > > yanghua1...@gmail.com> > >> > > > > > >>>> wrote: > >> > > > > > >>>>> > >> > > > > > >>>>>> Hi Kurt, > >> > > > > > >>>>>> > >> > > > > > >>>>>> Thanks for your comments. > >> > > > > > >>>>>> > >> > > > > > >>>>>> It seems we both implemented local aggregation feature > to > >> > > > optimize > >> > > > > > >>> the > >> > > > > > >>>>>> issue of data skew. > >> > > > > > >>>>>> However, IMHO, the API level of optimizing revenue is > >> > > different. > >> > > > > > >>>>>> > >> > > > > > >>>>>> *Your optimization benefits from Flink SQL and it's not > >> > user's > >> > > > > > >>>> faces.(If > >> > > > > > >>>>> I > >> > > > > > >>>>>> understand it incorrectly, please correct this.)* > >> > > > > > >>>>>> *Our implementation employs it as an optimization tool > >> API > >> > for > >> > > > > > >>>>> DataStream, > >> > > > > > >>>>>> it just like a local version of the keyBy API.* > >> > > > > > >>>>>> > >> > > > > > >>>>>> Based on this, I want to say support it as a DataStream > >> API > >> > > can > >> > > > > > >>> provide > >> > > > > > >>>>>> these advantages: > >> > > > > > >>>>>> > >> > > > > > >>>>>> > >> > > > > > >>>>>> - The localKeyBy API has a clear semantic and it's > >> > flexible > >> > > > not > >> > > > > > >>> only > >> > > > > > >>>>> for > >> > > > > > >>>>>> processing data skew but also for implementing some > >> user > >> > > > cases, > >> > > > > > >>> for > >> > > > > > >>>>>> example, if we want to calculate the multiple-level > >> > > > aggregation, > >> > > > > > >>> we > >> > > > > > >>>>> can > >> > > > > > >>>>>> do > >> > > > > > >>>>>> multiple-level aggregation in the local aggregation: > >> > > > > > >>>>>> > input.localKeyBy("a").sum(1).localKeyBy("b").window(); > >> // > >> > > here > >> > > > > > >> "a" > >> > > > > > >>>> is > >> > > > > > >>>>> a > >> > > > > > >>>>>> sub-category, while "b" is a category, here we do not > >> need > >> > > to > >> > > > > > >>>> shuffle > >> > > > > > >>>>>> data > >> > > > > > >>>>>> in the network. > >> > > > > > >>>>>> - The users of DataStream API will benefit from this. > >> > > > Actually, > >> > > > > > >> we > >> > > > > > >>>>> have > >> > > > > > >>>>>> a lot of scenes need to use DataStream API. > Currently, > >> > > > > > >> DataStream > >> > > > > > >>>> API > >> > > > > > >>>>> is > >> > > > > > >>>>>> the cornerstone of the physical plan of Flink SQL. > >> With a > >> > > > > > >>> localKeyBy > >> > > > > > >>>>>> API, > >> > > > > > >>>>>> the optimization of SQL at least may use this > optimized > >> > API, > >> > > > > > >> this > >> > > > > > >>>> is a > >> > > > > > >>>>>> further topic. > >> > > > > > >>>>>> - Based on the window operator, our state would > benefit > >> > from > >> > > > > > >> Flink > >> > > > > > >>>>> State > >> > > > > > >>>>>> and checkpoint, we do not need to worry about OOM and > >> job > >> > > > > > >> failed. > >> > > > > > >>>>>> > >> > > > > > >>>>>> Now, about your questions: > >> > > > > > >>>>>> > >> > > > > > >>>>>> 1. About our design cannot change the data type and > about > >> > the > >> > > > > > >>>>>> implementation of average: > >> > > > > > >>>>>> > >> > > > > > >>>>>> Just like my reply to Hequn, the localKeyBy is an API > >> > provides > >> > > > to > >> > > > > > >> the > >> > > > > > >>>>> users > >> > > > > > >>>>>> who use DataStream API to build their jobs. > >> > > > > > >>>>>> Users should know its semantics and the difference with > >> > keyBy > >> > > > API, > >> > > > > > >> so > >> > > > > > >>>> if > >> > > > > > >>>>>> they want to the average aggregation, they should carry > >> > local > >> > > > sum > >> > > > > > >>>> result > >> > > > > > >>>>>> and local count result. > >> > > > > > >>>>>> I admit that it will be convenient to use keyBy > directly. > >> > But > >> > > we > >> > > > > > >> need > >> > > > > > >>>> to > >> > > > > > >>>>>> pay a little price when we get some benefits. I think > >> this > >> > > price > >> > > > > is > >> > > > > > >>>>>> reasonable. Considering that the DataStream API itself > >> is a > >> > > > > > >> low-level > >> > > > > > >>>> API > >> > > > > > >>>>>> (at least for now). > >> > > > > > >>>>>> > >> > > > > > >>>>>> 2. About stateless operator and > >> > > > > > >>>>>> `StreamOperator::prepareSnapshotPreBarrier()`: > >> > > > > > >>>>>> > >> > > > > > >>>>>> Actually, I have discussed this opinion with @dianfu in > >> the > >> > > old > >> > > > > > >> mail > >> > > > > > >>>>>> thread. I will copy my opinion from there: > >> > > > > > >>>>>> > >> > > > > > >>>>>> - for your design, you still need somewhere to give > the > >> > > users > >> > > > > > >>>>> configure > >> > > > > > >>>>>> the trigger threshold (maybe memory availability?), > >> this > >> > > > design > >> > > > > > >>>> cannot > >> > > > > > >>>>>> guarantee a deterministic semantics (it will bring > >> trouble > >> > > for > >> > > > > > >>>> testing > >> > > > > > >>>>>> and > >> > > > > > >>>>>> debugging). > >> > > > > > >>>>>> - if the implementation depends on the timing of > >> > checkpoint, > >> > > > it > >> > > > > > >>>> would > >> > > > > > >>>>>> affect the checkpoint's progress, and the buffered > data > >> > may > >> > > > > > >> cause > >> > > > > > >>>> OOM > >> > > > > > >>>>>> issue. In addition, if the operator is stateless, it > >> can > >> > not > >> > > > > > >>> provide > >> > > > > > >>>>>> fault > >> > > > > > >>>>>> tolerance. > >> > > > > > >>>>>> > >> > > > > > >>>>>> Best, > >> > > > > > >>>>>> Vino > >> > > > > > >>>>>> > >> > > > > > >>>>>> Kurt Young <ykt...@gmail.com> 于2019年6月18日周二 上午9:22写道: > >> > > > > > >>>>>> > >> > > > > > >>>>>>> Hi Vino, > >> > > > > > >>>>>>> > >> > > > > > >>>>>>> Thanks for the proposal, I like the general idea and > IMO > >> > it's > >> > > > > > >> very > >> > > > > > >>>>> useful > >> > > > > > >>>>>>> feature. > >> > > > > > >>>>>>> But after reading through the document, I feel that we > >> may > >> > > over > >> > > > > > >>>> design > >> > > > > > >>>>>> the > >> > > > > > >>>>>>> required > >> > > > > > >>>>>>> operator for proper local aggregation. The main reason > >> is > >> > we > >> > > > want > >> > > > > > >>> to > >> > > > > > >>>>>> have a > >> > > > > > >>>>>>> clear definition and behavior about the "local keyed > >> state" > >> > > > which > >> > > > > > >>> in > >> > > > > > >>>> my > >> > > > > > >>>>>>> opinion is not > >> > > > > > >>>>>>> necessary for local aggregation, at least for start. > >> > > > > > >>>>>>> > >> > > > > > >>>>>>> Another issue I noticed is the local key by operator > >> cannot > >> > > > > > >> change > >> > > > > > >>>>>> element > >> > > > > > >>>>>>> type, it will > >> > > > > > >>>>>>> also restrict a lot of use cases which can be benefit > >> from > >> > > > local > >> > > > > > >>>>>>> aggregation, like "average". > >> > > > > > >>>>>>> > >> > > > > > >>>>>>> We also did similar logic in SQL and the only thing > >> need to > >> > > be > >> > > > > > >> done > >> > > > > > >>>> is > >> > > > > > >>>>>>> introduce > >> > > > > > >>>>>>> a stateless lightweight operator which is *chained* > >> before > >> > > > > > >>> `keyby()`. > >> > > > > > >>>>> The > >> > > > > > >>>>>>> operator will flush all buffered > >> > > > > > >>>>>>> elements during > >> > `StreamOperator::prepareSnapshotPreBarrier()` > >> > > > and > >> > > > > > >>>> make > >> > > > > > >>>>>>> himself stateless. > >> > > > > > >>>>>>> By the way, in the earlier version we also did the > >> similar > >> > > > > > >> approach > >> > > > > > >>>> by > >> > > > > > >>>>>>> introducing a stateful > >> > > > > > >>>>>>> local aggregation operator but it's not performed as > >> well > >> > as > >> > > > the > >> > > > > > >>>> later > >> > > > > > >>>>>> one, > >> > > > > > >>>>>>> and also effect the barrie > >> > > > > > >>>>>>> alignment time. The later one is fairly simple and > more > >> > > > > > >> efficient. > >> > > > > > >>>>>>> > >> > > > > > >>>>>>> I would highly suggest you to consider to have a > >> stateless > >> > > > > > >> approach > >> > > > > > >>>> at > >> > > > > > >>>>>> the > >> > > > > > >>>>>>> first step. > >> > > > > > >>>>>>> > >> > > > > > >>>>>>> Best, > >> > > > > > >>>>>>> Kurt > >> > > > > > >>>>>>> > >> > > > > > >>>>>>> > >> > > > > > >>>>>>> On Mon, Jun 17, 2019 at 7:32 PM Jark Wu < > >> imj...@gmail.com> > >> > > > > > >> wrote: > >> > > > > > >>>>>>> > >> > > > > > >>>>>>>> Hi Vino, > >> > > > > > >>>>>>>> > >> > > > > > >>>>>>>> Thanks for the proposal. > >> > > > > > >>>>>>>> > >> > > > > > >>>>>>>> Regarding to the "input.keyBy(0).sum(1)" vs > >> > > > > > >>>>>>>> > >> > "input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)", > >> > > > > > >> have > >> > > > > > >>>> you > >> > > > > > >>>>>>> done > >> > > > > > >>>>>>>> some benchmark? > >> > > > > > >>>>>>>> Because I'm curious about how much performance > >> improvement > >> > > can > >> > > > > > >> we > >> > > > > > >>>> get > >> > > > > > >>>>>> by > >> > > > > > >>>>>>>> using count window as the local operator. > >> > > > > > >>>>>>>> > >> > > > > > >>>>>>>> Best, > >> > > > > > >>>>>>>> Jark > >> > > > > > >>>>>>>> > >> > > > > > >>>>>>>> > >> > > > > > >>>>>>>> > >> > > > > > >>>>>>>> On Mon, 17 Jun 2019 at 17:48, vino yang < > >> > > > yanghua1...@gmail.com > >> > > > > > >>> > >> > > > > > >>>>> wrote: > >> > > > > > >>>>>>>> > >> > > > > > >>>>>>>>> Hi Hequn, > >> > > > > > >>>>>>>>> > >> > > > > > >>>>>>>>> Thanks for your reply. > >> > > > > > >>>>>>>>> > >> > > > > > >>>>>>>>> The purpose of localKeyBy API is to provide a tool > >> which > >> > > can > >> > > > > > >>> let > >> > > > > > >>>>>> users > >> > > > > > >>>>>>> do > >> > > > > > >>>>>>>>> pre-aggregation in the local. The behavior of the > >> > > > > > >>> pre-aggregation > >> > > > > > >>>>> is > >> > > > > > >>>>>>>>> similar to keyBy API. > >> > > > > > >>>>>>>>> > >> > > > > > >>>>>>>>> So the three cases are different, I will describe > them > >> > one > >> > > by > >> > > > > > >>>> one: > >> > > > > > >>>>>>>>> > >> > > > > > >>>>>>>>> 1. input.keyBy(0).sum(1) > >> > > > > > >>>>>>>>> > >> > > > > > >>>>>>>>> *In this case, the result is event-driven, each > event > >> can > >> > > > > > >>> produce > >> > > > > > >>>>> one > >> > > > > > >>>>>>> sum > >> > > > > > >>>>>>>>> aggregation result and it is the latest one from the > >> > source > >> > > > > > >>>> start.* > >> > > > > > >>>>>>>>> > >> > > > > > >>>>>>>>> 2. input.localKeyBy(0).sum(1).keyBy(0).sum(1) > >> > > > > > >>>>>>>>> > >> > > > > > >>>>>>>>> *In this case, the semantic may have a problem, it > >> would > >> > do > >> > > > > > >> the > >> > > > > > >>>>> local > >> > > > > > >>>>>>> sum > >> > > > > > >>>>>>>>> aggregation and will produce the latest partial > result > >> > from > >> > > > > > >> the > >> > > > > > >>>>>> source > >> > > > > > >>>>>>>>> start for every event. * > >> > > > > > >>>>>>>>> *These latest partial results from the same key are > >> > hashed > >> > > to > >> > > > > > >>> one > >> > > > > > >>>>>> node > >> > > > > > >>>>>>> to > >> > > > > > >>>>>>>>> do the global sum aggregation.* > >> > > > > > >>>>>>>>> *In the global aggregation, when it received > multiple > >> > > partial > >> > > > > > >>>>> results > >> > > > > > >>>>>>>> (they > >> > > > > > >>>>>>>>> are all calculated from the source start) and sum > them > >> > will > >> > > > > > >> get > >> > > > > > >>>> the > >> > > > > > >>>>>>> wrong > >> > > > > > >>>>>>>>> result.* > >> > > > > > >>>>>>>>> > >> > > > > > >>>>>>>>> 3. > >> > > input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1) > >> > > > > > >>>>>>>>> > >> > > > > > >>>>>>>>> *In this case, it would just get a partial > aggregation > >> > > result > >> > > > > > >>> for > >> > > > > > >>>>>> the 5 > >> > > > > > >>>>>>>>> records in the count window. The partial aggregation > >> > > results > >> > > > > > >>> from > >> > > > > > >>>>> the > >> > > > > > >>>>>>>> same > >> > > > > > >>>>>>>>> key will be aggregated globally.* > >> > > > > > >>>>>>>>> > >> > > > > > >>>>>>>>> So the first case and the third case can get the > >> *same* > >> > > > > > >> result, > >> > > > > > >>>> the > >> > > > > > >>>>>>>>> difference is the output-style and the latency. > >> > > > > > >>>>>>>>> > >> > > > > > >>>>>>>>> Generally speaking, the local key API is just an > >> > > optimization > >> > > > > > >>>> API. > >> > > > > > >>>>> We > >> > > > > > >>>>>>> do > >> > > > > > >>>>>>>>> not limit the user's usage, but the user has to > >> > understand > >> > > > > > >> its > >> > > > > > >>>>>>> semantics > >> > > > > > >>>>>>>>> and use it correctly. > >> > > > > > >>>>>>>>> > >> > > > > > >>>>>>>>> Best, > >> > > > > > >>>>>>>>> Vino > >> > > > > > >>>>>>>>> > >> > > > > > >>>>>>>>> Hequn Cheng <chenghe...@gmail.com> 于2019年6月17日周一 > >> > 下午4:18写道: > >> > > > > > >>>>>>>>> > >> > > > > > >>>>>>>>>> Hi Vino, > >> > > > > > >>>>>>>>>> > >> > > > > > >>>>>>>>>> Thanks for the proposal, I think it is a very good > >> > > feature! > >> > > > > > >>>>>>>>>> > >> > > > > > >>>>>>>>>> One thing I want to make sure is the semantics for > >> the > >> > > > > > >>>>>> `localKeyBy`. > >> > > > > > >>>>>>>> From > >> > > > > > >>>>>>>>>> the document, the `localKeyBy` API returns an > >> instance > >> > of > >> > > > > > >>>>>>> `KeyedStream` > >> > > > > > >>>>>>>>>> which can also perform sum(), so in this case, > what's > >> > the > >> > > > > > >>>>> semantics > >> > > > > > >>>>>>> for > >> > > > > > >>>>>>>>>> `localKeyBy()`. For example, will the following > code > >> > share > >> > > > > > >>> the > >> > > > > > >>>>> same > >> > > > > > >>>>>>>>> result? > >> > > > > > >>>>>>>>>> and what're the differences between them? > >> > > > > > >>>>>>>>>> > >> > > > > > >>>>>>>>>> 1. input.keyBy(0).sum(1) > >> > > > > > >>>>>>>>>> 2. input.localKeyBy(0).sum(1).keyBy(0).sum(1) > >> > > > > > >>>>>>>>>> 3. > >> > > > > > >> input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1) > >> > > > > > >>>>>>>>>> > >> > > > > > >>>>>>>>>> Would also be great if we can add this into the > >> > document. > >> > > > > > >>> Thank > >> > > > > > >>>>> you > >> > > > > > >>>>>>>> very > >> > > > > > >>>>>>>>>> much. > >> > > > > > >>>>>>>>>> > >> > > > > > >>>>>>>>>> Best, Hequn > >> > > > > > >>>>>>>>>> > >> > > > > > >>>>>>>>>> > >> > > > > > >>>>>>>>>> On Fri, Jun 14, 2019 at 11:34 AM vino yang < > >> > > > > > >>>>> yanghua1...@gmail.com> > >> > > > > > >>>>>>>>> wrote: > >> > > > > > >>>>>>>>>> > >> > > > > > >>>>>>>>>>> Hi Aljoscha, > >> > > > > > >>>>>>>>>>> > >> > > > > > >>>>>>>>>>> I have looked at the "*Process*" section of FLIP > >> wiki > >> > > > > > >>>> page.[1] > >> > > > > > >>>>>> This > >> > > > > > >>>>>>>>> mail > >> > > > > > >>>>>>>>>>> thread indicates that it has proceeded to the > third > >> > step. > >> > > > > > >>>>>>>>>>> > >> > > > > > >>>>>>>>>>> When I looked at the fourth step(vote step), I > >> didn't > >> > > > > > >> find > >> > > > > > >>>> the > >> > > > > > >>>>>>>>>>> prerequisites for starting the voting process. > >> > > > > > >>>>>>>>>>> > >> > > > > > >>>>>>>>>>> Considering that the discussion of this feature > has > >> > been > >> > > > > > >>> done > >> > > > > > >>>>> in > >> > > > > > >>>>>>> the > >> > > > > > >>>>>>>>> old > >> > > > > > >>>>>>>>>>> thread. [2] So can you tell me when should I start > >> > > > > > >> voting? > >> > > > > > >>>> Can > >> > > > > > >>>>> I > >> > > > > > >>>>>>>> start > >> > > > > > >>>>>>>>>> now? > >> > > > > > >>>>>>>>>>> > >> > > > > > >>>>>>>>>>> Best, > >> > > > > > >>>>>>>>>>> Vino > >> > > > > > >>>>>>>>>>> > >> > > > > > >>>>>>>>>>> [1]: > >> > > > > > >>>>>>>>>>> > >> > > > > > >>>>>>>>>>> > >> > > > > > >>>>>>>>>> > >> > > > > > >>>>>>>>> > >> > > > > > >>>>>>>> > >> > > > > > >>>>>>> > >> > > > > > >>>>>> > >> > > > > > >>>>> > >> > > > > > >>>> > >> > > > > > >>> > >> > > > > > >> > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals#FlinkImprovementProposals-FLIPround-up > >> > > > > > >>>>>>>>>>> [2]: > >> > > > > > >>>>>>>>>>> > >> > > > > > >>>>>>>>>>> > >> > > > > > >>>>>>>>>> > >> > > > > > >>>>>>>>> > >> > > > > > >>>>>>>> > >> > > > > > >>>>>>> > >> > > > > > >>>>>> > >> > > > > > >>>>> > >> > > > > > >>>> > >> > > > > > >>> > >> > > > > > >> > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308 > >> > > > > > >>>>>>>>>>> > >> > > > > > >>>>>>>>>>> leesf <leesf0...@gmail.com> 于2019年6月13日周四 > 上午9:19写道: > >> > > > > > >>>>>>>>>>> > >> > > > > > >>>>>>>>>>>> +1 for the FLIP, thank vino for your efforts. > >> > > > > > >>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>> Best, > >> > > > > > >>>>>>>>>>>> Leesf > >> > > > > > >>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>> vino yang <yanghua1...@gmail.com> 于2019年6月12日周三 > >> > > > > > >>> 下午5:46写道: > >> > > > > > >>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>> Hi folks, > >> > > > > > >>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>> I would like to start the FLIP discussion thread > >> > > > > > >> about > >> > > > > > >>>>>>> supporting > >> > > > > > >>>>>>>>>> local > >> > > > > > >>>>>>>>>>>>> aggregation in Flink. > >> > > > > > >>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>> In short, this feature can effectively alleviate > >> data > >> > > > > > >>>> skew. > >> > > > > > >>>>>>> This > >> > > > > > >>>>>>>> is > >> > > > > > >>>>>>>>>> the > >> > > > > > >>>>>>>>>>>>> FLIP: > >> > > > > > >>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>> > >> > > > > > >>>>>>>>>>> > >> > > > > > >>>>>>>>>> > >> > > > > > >>>>>>>>> > >> > > > > > >>>>>>>> > >> > > > > > >>>>>>> > >> > > > > > >>>>>> > >> > > > > > >>>>> > >> > > > > > >>>> > >> > > > > > >>> > >> > > > > > >> > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-44%3A+Support+Local+Aggregation+in+Flink > >> > > > > > >>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>> *Motivation* (copied from FLIP) > >> > > > > > >>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>> Currently, keyed streams are widely used to > >> perform > >> > > > > > >>>>>> aggregating > >> > > > > > >>>>>>>>>>>> operations > >> > > > > > >>>>>>>>>>>>> (e.g., reduce, sum and window) on the elements > >> that > >> > > > > > >>> have > >> > > > > > >>>>> the > >> > > > > > >>>>>>> same > >> > > > > > >>>>>>>>>> key. > >> > > > > > >>>>>>>>>>>> When > >> > > > > > >>>>>>>>>>>>> executed at runtime, the elements with the same > >> key > >> > > > > > >>> will > >> > > > > > >>>> be > >> > > > > > >>>>>>> sent > >> > > > > > >>>>>>>> to > >> > > > > > >>>>>>>>>> and > >> > > > > > >>>>>>>>>>>>> aggregated by the same task. > >> > > > > > >>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>> The performance of these aggregating operations > is > >> > > > > > >> very > >> > > > > > >>>>>>> sensitive > >> > > > > > >>>>>>>>> to > >> > > > > > >>>>>>>>>>> the > >> > > > > > >>>>>>>>>>>>> distribution of keys. In the cases where the > >> > > > > > >>> distribution > >> > > > > > >>>>> of > >> > > > > > >>>>>>> keys > >> > > > > > >>>>>>>>>>>> follows a > >> > > > > > >>>>>>>>>>>>> powerful law, the performance will be > >> significantly > >> > > > > > >>>>>> downgraded. > >> > > > > > >>>>>>>>> More > >> > > > > > >>>>>>>>>>>>> unluckily, increasing the degree of parallelism > >> does > >> > > > > > >>> not > >> > > > > > >>>>> help > >> > > > > > >>>>>>>> when > >> > > > > > >>>>>>>>> a > >> > > > > > >>>>>>>>>>> task > >> > > > > > >>>>>>>>>>>>> is overloaded by a single key. > >> > > > > > >>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>> Local aggregation is a widely-adopted method to > >> > > > > > >> reduce > >> > > > > > >>>> the > >> > > > > > >>>>>>>>>> performance > >> > > > > > >>>>>>>>>>>>> degraded by data skew. We can decompose the > >> > > > > > >> aggregating > >> > > > > > >>>>>>>> operations > >> > > > > > >>>>>>>>>> into > >> > > > > > >>>>>>>>>>>> two > >> > > > > > >>>>>>>>>>>>> phases. In the first phase, we aggregate the > >> elements > >> > > > > > >>> of > >> > > > > > >>>>> the > >> > > > > > >>>>>>> same > >> > > > > > >>>>>>>>> key > >> > > > > > >>>>>>>>>>> at > >> > > > > > >>>>>>>>>>>>> the sender side to obtain partial results. Then > at > >> > > > > > >> the > >> > > > > > >>>>> second > >> > > > > > >>>>>>>>> phase, > >> > > > > > >>>>>>>>>>>> these > >> > > > > > >>>>>>>>>>>>> partial results are sent to receivers according > to > >> > > > > > >>> their > >> > > > > > >>>>> keys > >> > > > > > >>>>>>> and > >> > > > > > >>>>>>>>> are > >> > > > > > >>>>>>>>>>>>> combined to obtain the final result. Since the > >> number > >> > > > > > >>> of > >> > > > > > >>>>>>> partial > >> > > > > > >>>>>>>>>>> results > >> > > > > > >>>>>>>>>>>>> received by each receiver is limited by the > >> number of > >> > > > > > >>>>>> senders, > >> > > > > > >>>>>>>> the > >> > > > > > >>>>>>>>>>>>> imbalance among receivers can be reduced. > >> Besides, by > >> > > > > > >>>>>> reducing > >> > > > > > >>>>>>>> the > >> > > > > > >>>>>>>>>>> amount > >> > > > > > >>>>>>>>>>>>> of transferred data the performance can be > further > >> > > > > > >>>>> improved. > >> > > > > > >>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>> *More details*: > >> > > > > > >>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>> Design documentation: > >> > > > > > >>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>> > >> > > > > > >>>>>>>>>>> > >> > > > > > >>>>>>>>>> > >> > > > > > >>>>>>>>> > >> > > > > > >>>>>>>> > >> > > > > > >>>>>>> > >> > > > > > >>>>>> > >> > > > > > >>>>> > >> > > > > > >>>> > >> > > > > > >>> > >> > > > > > >> > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing > >> > > > > > >>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>> Old discussion thread: > >> > > > > > >>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>> > >> > > > > > >>>>>>>>>>> > >> > > > > > >>>>>>>>>> > >> > > > > > >>>>>>>>> > >> > > > > > >>>>>>>> > >> > > > > > >>>>>>> > >> > > > > > >>>>>> > >> > > > > > >>>>> > >> > > > > > >>>> > >> > > > > > >>> > >> > > > > > >> > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308 > >> > > > > > >>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>> JIRA: FLINK-12786 < > >> > > > > > >>>>>>>>> https://issues.apache.org/jira/browse/FLINK-12786 > >> > > > > > >>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>> We are looking forwards to your feedback! > >> > > > > > >>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>>> Best, > >> > > > > > >>>>>>>>>>>>> Vino > >> > > > > > >>>>>>>>>>>>> > >> > > > > > >>>>>>>>>>>> > >> > > > > > >>>>>>>>>>> > >> > > > > > >>>>>>>>>> > >> > > > > > >>>>>>>>> > >> > > > > > >>>>>>>> > >> > > > > > >>>>>>> > >> > > > > > >>>>>> > >> > > > > > >>>>> > >> > > > > > >>>> > >> > > > > > >>> > >> > > > > > >> > >> > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > > >