Hi Vino, So the difference between `DataStream.localKeyBy().process()` with `DataStream.process()` is that the former can access keyed state and the latter can only access operator state. I think it's out of the scope of designing a local aggregation API. It might be an extension of state API, i.e. local keyed state. The difference between local keyed state with operator state (if I understand correctly) is local keyed state can be backed on RocksDB? or making "keyed state" locally? IMO, it's a larger topic than local aggregation and should be discussed separately. I cc-ed people who works on states @Tzu-Li (Gordon) Tai <tzuli...@apache.org> @Seth @Yu Li to give some feedback from the perspective of state.
Regarding to the API designing updated in your FLIP, I have some concerns: 1) The "localKeyBy()" method returns a "KeyedStream" which exposes all method of it. However, not every method makes sense or have a clear definition on local stream. For example, "countWindow(long, long)", "timeWindow(long, long)", "window(WindowAssigner)", and "intervalJoin" Hequn mentioned before. I would suggest we can expose the only APIs we needed for local aggregation and leave the others later. We can return a "LocalKeyedStream" and may expose only some dedicated methods: for example, "aggregate()", "trigger()". These APIs do not need to expose local keyed state to support local aggregation. 2) I think `localKeyBy().process()` is something called "local process", not just "local aggregate". It needs more discussion about local keyed state, and I would like to put it out of this FLIP. Regards, Jark On Thu, 27 Jun 2019 at 13:03, vino yang <yanghua1...@gmail.com> wrote: > Hi all, > > I also think it's a good idea that we need to agree on the API level first. > > I am sorry, we did not give some usage examples of the API in the FLIP > documentation before. This may have caused some misunderstandings about the > discussion of this mail thread. > > So, now I have added some usage examples in the "Public Interfaces" > section of the FLIP-44 documentation. > > Let us first know the API through its use examples. > > Any feedback and questions please let me know. > > Best, > Vino > > vino yang <yanghua1...@gmail.com> 于2019年6月27日周四 下午12:51写道: > >> Hi Jark, >> >> `DataStream.localKeyBy().process()` has some key difference with >> `DataStream.process()`. The former API receive `KeyedProcessFunction` >> (sorry my previous reply may let you misunderstood), the latter receive API >> receive `ProcessFunction`. When you read the java doc of ProcessFunction, >> you can find a "*Note*" statement: >> >> Access to keyed state and timers (which are also scoped to a key) is only >>> available if the ProcessFunction is applied on a KeyedStream. >> >> >> In addition, you can also compare the two >> implementations(`ProcessOperator` and `KeyedProcessOperator`) of them to >> view the difference. >> >> IMO, the "Note" statement means a lot for many use scenarios. >> For example, if we cannot access keyed state, we can only use heap memory >> to buffer data while it does not guarantee the semantics of correctness! >> And the timer is also very important in some scenarios. >> >> That's why we say our API is flexible, it can get most benefits (even >> subsequent potential benefits in the future) from KeyedStream. >> >> I have added some instructions on the use of localKeyBy in the FLIP-44 >> documentation. >> >> Best, >> Vino >> >> >> Jark Wu <imj...@gmail.com> 于2019年6月27日周四 上午10:44写道: >> >>> Hi Piotr, >>> >>> I think the state migration you raised is a good point. Having >>> "stream.enableLocalAggregation(Trigger)” might add some implicit operators >>> which users can't set uid and cause the state compatibility/evolution >>> problems. >>> So let's put this in rejected alternatives. >>> >>> Hi Vino, >>> >>> You mentioned several times that "DataStream.localKeyBy().process()" can >>> solve the data skew problem of "DataStream.keyBy().process()". >>> I'm curious about what's the differences between "DataStream.process()" >>> and "DataStream.localKeyBy().process()"? >>> Can't "DataStream.process()" solve the data skew problem? >>> >>> Best, >>> Jark >>> >>> >>> On Wed, 26 Jun 2019 at 18:20, Piotr Nowojski <pi...@ververica.com> >>> wrote: >>> >>>> Hi Jark and Vino, >>>> >>>> I agree fully with Jark, that in order to have the discussion focused >>>> and to limit the number of parallel topics, we should first focus on one >>>> topic. We can first decide on the API and later we can discuss the runtime >>>> details. At least as long as we keep the potential requirements of the >>>> runtime part in mind while designing the API. >>>> >>>> Regarding the automatic optimisation and proposed by Jark: >>>> >>>> "stream.enableLocalAggregation(Trigger)” >>>> >>>> I would be against that in the DataStream API for the reasons that Vino >>>> presented. There was a discussion thread about future directions of Table >>>> API vs DataStream API and the consensus was that the automatic >>>> optimisations are one of the dividing lines between those two, for at least >>>> a couple of reasons. Flexibility and full control over the program was one >>>> of them. Another is state migration. Having >>>> "stream.enableLocalAggregation(Trigger)” that might add some implicit >>>> operators in the job graph can cause problems with savepoint/checkpoint >>>> compatibility. >>>> >>>> However I haven’t thought about/looked into the details of the Vino’s >>>> API proposal, so I can not fully judge it. >>>> >>>> Piotrek >>>> >>>> > On 26 Jun 2019, at 09:17, vino yang <yanghua1...@gmail.com> wrote: >>>> > >>>> > Hi Jark, >>>> > >>>> > Similar questions and responses have been repeated many times. >>>> > >>>> > Why didn't we spend more sections discussing the API? >>>> > >>>> > Because we try to reuse the ability of KeyedStream. The localKeyBy >>>> API just returns the KeyedStream, that's our design, we can get all the >>>> benefit from the KeyedStream and get further benefit from WindowedStream. >>>> The APIs come from KeyedStream and WindowedStream is long-tested and >>>> flexible. Yes, we spend much space discussing the local keyed state, that's >>>> not the goal and motivation, that's the way to implement local aggregation. >>>> It is much more complicated than the API we introduced, so we spent more >>>> section. Of course, this is the implementation level of the Operator. We >>>> also agreed to support the implementation of buffer+flush and added related >>>> instructions to the documentation. This needs to wait for the community to >>>> recognize, and if the community agrees, we will give more instructions. >>>> What's more, I have indicated before that we welcome state-related >>>> commenters to participate in the discussion, but it is not wise to modify >>>> the FLIP title. >>>> > >>>> > About the API of local aggregation: >>>> > >>>> > I don't object to ease of use is very important. But IMHO flexibility >>>> is the most important at the DataStream API level. Otherwise, what does >>>> DataStream mean? The significance of the DataStream API is that it is more >>>> flexible than Table/SQL, if it cannot provide this point then everyone >>>> would just use Table/SQL. >>>> > >>>> > The DataStream API should focus more on flexibility than on automatic >>>> optimization, which allows users to have more possibilities to implement >>>> complex programs and meet specific scenarios. There are a lot of programs >>>> written using the DataStream API that are far more complex than we think. >>>> It is very difficult to optimize at the API level and the benefit is very >>>> low. >>>> > >>>> > I want to say that we support a more generalized local aggregation. I >>>> mentioned in the previous reply that not only the UDF that implements >>>> AggregateFunction is called aggregation. In some complex scenarios, we have >>>> to support local aggregation through ProcessFunction and >>>> ProcessWindowFunction to solve the data skew problem. How do you support >>>> them in the API implementation and optimization you mentioned? >>>> > >>>> > Flexible APIs are arbitrarily combined to result in erroneous >>>> semantics, which does not prove that flexibility is meaningless because the >>>> user is the decision maker. I have been exemplified many times, for many >>>> APIs in DataStream, if we arbitrarily combined them, they also do not have >>>> much practical significance. So, users who use flexible APIs need to >>>> understand what they are doing and what is the right choice. >>>> > >>>> > I think that if we discuss this, there will be no result. >>>> > >>>> > @Stephan Ewen <mailto:se...@apache.org> , @Aljoscha Krettek <mailto: >>>> aljos...@apache.org> and @Piotr Nowojski <mailto:pi...@ververica.com> >>>> Do you have further comments? >>>> > >>>> > >>>> > Jark Wu <imj...@gmail.com <mailto:imj...@gmail.com>> 于2019年6月26日周三 >>>> 上午11:46写道: >>>> > Thanks for the long discussion Vino, Kurt, Hequn, Piotr and others, >>>> > >>>> > It seems that we still have some different ideas about the API >>>> > (localKeyBy()?) and implementation details (reuse window operator? >>>> local >>>> > keyed state?). >>>> > And the discussion is stalled and mixed with motivation and API and >>>> > implementation discussion. >>>> > >>>> > In order to make some progress in this topic, I want to summarize the >>>> > points (pls correct me if I'm wrong or missing sth) and would suggest >>>> to >>>> > split >>>> > the topic into following aspects and discuss them one by one. >>>> > >>>> > 1) What's the main purpose of this FLIP? >>>> > - From the title of this FLIP, it is to support local aggregate. >>>> However >>>> > from the content of the FLIP, 80% are introducing a new state called >>>> local >>>> > keyed state. >>>> > - If we mainly want to introduce local keyed state, then we should >>>> > re-title the FLIP and involve in more people who works on state. >>>> > - If we mainly want to support local aggregate, then we can jump to >>>> step 2 >>>> > to discuss the API design. >>>> > >>>> > 2) What does the API look like? >>>> > - Vino proposed to use "localKeyBy()" to do local process, the >>>> output of >>>> > local process is the result type of aggregate function. >>>> > a) For non-windowed aggregate: >>>> > input.localKeyBy(..).aggregate(agg1).keyBy(..).aggregate(agg2) >>>> **NOT >>>> > SUPPORT** >>>> > b) For windowed aggregate: >>>> > >>>> input.localKeyBy(..).window(w1).aggregate(agg1).keyBy(..).window(w2).aggregate(agg2) >>>> > >>>> > 3) What's the implementation detail? >>>> > - may reuse window operator or not. >>>> > - may introduce a new state concepts or not. >>>> > - may not have state in local operator by flushing buffers in >>>> > prepareSnapshotPreBarrier >>>> > - and so on... >>>> > - we can discuss these later when we reach a consensus on API >>>> > >>>> > -------------------- >>>> > >>>> > Here are my thoughts: >>>> > >>>> > 1) Purpose of this FLIP >>>> > - From the motivation section in the FLIP, I think the purpose is to >>>> > support local aggregation to solve the data skew issue. >>>> > Then I think we should focus on how to provide a easy to use and >>>> clear >>>> > API to support **local aggregation**. >>>> > - Vino's point is centered around the local keyed state API (or >>>> > localKeyBy()), and how to leverage the local keyed state API to >>>> support >>>> > local aggregation. >>>> > But I'm afraid it's not a good way to design API for local >>>> aggregation. >>>> > >>>> > 2) local aggregation API >>>> > - IMO, the method call chain >>>> > >>>> "input.localKeyBy(..).window(w1).aggregate(agg1).keyBy(..).window(w2).aggregate(agg2)" >>>> > is not such easy to use. >>>> > Because we have to provide two implementation for an aggregation >>>> (one >>>> > for partial agg, another for final agg). And we have to take care of >>>> > the first window call, an inappropriate window call will break the >>>> > sematics. >>>> > - From my point of view, local aggregation is a mature concept which >>>> > should output the intermediate accumulator (ACC) in the past period >>>> of time >>>> > (a trigger). >>>> > And the downstream final aggregation will merge ACCs received from >>>> local >>>> > side, and output the current final result. >>>> > - The current "AggregateFunction" API in DataStream already has the >>>> > accumulator type and "merge" method. So the only thing user need to >>>> do is >>>> > how to enable >>>> > local aggregation opimization and set a trigger. >>>> > - One idea comes to my head is that, assume we have a windowed >>>> aggregation >>>> > stream: "val stream = input.keyBy().window(w).aggregate(agg)". We can >>>> > provide an API on the stream. >>>> > For exmaple, "stream.enableLocalAggregation(Trigger)", the trigger >>>> can >>>> > be "ContinuousEventTimeTrigger.of(Time.of(Time.minutes(1)))". Then it >>>> will >>>> > be optmized into >>>> > local operator + final operator, and local operator will combine >>>> records >>>> > every minute on event time. >>>> > - In this way, there is only one line added, and the output is the >>>> same >>>> > with before, because it is just an opimization. >>>> > >>>> > >>>> > Regards, >>>> > Jark >>>> > >>>> > >>>> > >>>> > On Tue, 25 Jun 2019 at 14:34, vino yang <yanghua1...@gmail.com >>>> <mailto:yanghua1...@gmail.com>> wrote: >>>> > >>>> > > Hi Kurt, >>>> > > >>>> > > Answer your questions: >>>> > > >>>> > > a) Sorry, I just updated the Google doc, still have no time update >>>> the >>>> > > FLIP, will update FLIP as soon as possible. >>>> > > About your description at this point, I have a question, what does >>>> it mean: >>>> > > how do we combine with >>>> > > `AggregateFunction`? >>>> > > >>>> > > I have shown you the examples which Flink has supported: >>>> > > >>>> > > - input.localKeyBy(0).aggregate() >>>> > > - input.localKeyBy(0).window().aggregate() >>>> > > >>>> > > You can show me a example about how do we combine with >>>> `AggregateFuncion` >>>> > > through your localAggregate API. >>>> > > >>>> > > About the example, how to do the local aggregation for AVG, >>>> consider this >>>> > > code: >>>> > > >>>> > > >>>> > > >>>> > > >>>> > > >>>> > > >>>> > > >>>> > > >>>> > > >>>> > > *DataStream<Tuple2<String, Long>> source = null; source >>>> .localKeyBy(0) >>>> > > .timeWindow(Time.seconds(60)) .aggregate(agg1, new >>>> > > WindowFunction<Tuple2<Long, Long>, Tuple3<String, Long, Long>, >>>> String, >>>> > > TimeWindow>() {}) .keyBy(0) .timeWindow(Time.seconds(60)) >>>> .aggregate(agg2, >>>> > > new WindowFunction<Tuple2<Long, Long>, Tuple2<String, Long>, String, >>>> > > TimeWindow>());* >>>> > > >>>> > > *agg1:* >>>> > > *signature : new AggregateFunction<Tuple2<String, Long>, >>>> Tuple2<Long, >>>> > > Long>, Tuple2<Long, Long>>() {}* >>>> > > *input param type: Tuple2<String, Long> f0: key, f1: value* >>>> > > *intermediate result type: Tuple2<Long, Long>, f0: local aggregated >>>> sum; >>>> > > f1: local aggregated count* >>>> > > *output param type: Tuple2<Long, Long>, f0: local aggregated sum; >>>> f1: >>>> > > local aggregated count* >>>> > > >>>> > > *agg2:* >>>> > > *signature: new AggregateFunction<Tuple3<String, Long, Long>, Long, >>>> > > Tuple2<String, Long>>() {},* >>>> > > *input param type: Tuple3<String, Long, Long>, f0: key, f1: local >>>> > > aggregated sum; f2: local aggregated count* >>>> > > >>>> > > *intermediate result type: Long avg result* >>>> > > *output param type: Tuple2<String, Long> f0: key, f1 avg result* >>>> > > >>>> > > For sliding window, we just need to change the window type if users >>>> want to >>>> > > do. >>>> > > Again, we try to give the design and implementation in the >>>> DataStream >>>> > > level. So I believe we can match all the requirements(It's just >>>> that the >>>> > > implementation may be different) comes from the SQL level. >>>> > > >>>> > > b) Yes, Theoretically, your thought is right. But in reality, it >>>> cannot >>>> > > bring many benefits. >>>> > > If we want to get the benefits from the window API, while we do not >>>> reuse >>>> > > the window operator? And just copy some many duplicated code to >>>> another >>>> > > operator? >>>> > > >>>> > > c) OK, I agree to let the state backend committers join this >>>> discussion. >>>> > > >>>> > > Best, >>>> > > Vino >>>> > > >>>> > > >>>> > > Kurt Young <ykt...@gmail.com <mailto:ykt...@gmail.com>> >>>> 于2019年6月24日周一 下午6:53写道: >>>> > > >>>> > > > Hi vino, >>>> > > > >>>> > > > One thing to add, for a), I think use one or two examples like >>>> how to do >>>> > > > local aggregation on a sliding window, >>>> > > > and how do we do local aggregation on an unbounded aggregate, >>>> will do a >>>> > > lot >>>> > > > help. >>>> > > > >>>> > > > Best, >>>> > > > Kurt >>>> > > > >>>> > > > >>>> > > > On Mon, Jun 24, 2019 at 6:06 PM Kurt Young <ykt...@gmail.com >>>> <mailto:ykt...@gmail.com>> wrote: >>>> > > > >>>> > > > > Hi vino, >>>> > > > > >>>> > > > > I think there are several things still need discussion. >>>> > > > > >>>> > > > > a) We all agree that we should first go with a unified >>>> abstraction, but >>>> > > > > the abstraction is not reflected by the FLIP. >>>> > > > > If your answer is "locakKeyBy" API, then I would ask how do we >>>> combine >>>> > > > > with `AggregateFunction`, and how do >>>> > > > > we do proper local aggregation for those have different >>>> intermediate >>>> > > > > result type, like AVG. Could you add these >>>> > > > > to the document? >>>> > > > > >>>> > > > > b) From implementation side, reusing window operator is one of >>>> the >>>> > > > > possible solutions, but not we base on window >>>> > > > > operator to have two different implementations. What I >>>> understanding >>>> > > is, >>>> > > > > one of the possible implementations should >>>> > > > > not touch window operator. >>>> > > > > >>>> > > > > c) 80% of your FLIP content is actually describing how do we >>>> support >>>> > > > local >>>> > > > > keyed state. I don't know if this is necessary >>>> > > > > to introduce at the first step and we should also involve >>>> committers >>>> > > work >>>> > > > > on state backend to share their thoughts. >>>> > > > > >>>> > > > > Best, >>>> > > > > Kurt >>>> > > > > >>>> > > > > >>>> > > > > On Mon, Jun 24, 2019 at 5:17 PM vino yang < >>>> yanghua1...@gmail.com <mailto:yanghua1...@gmail.com>> >>>> > > wrote: >>>> > > > > >>>> > > > >> Hi Kurt, >>>> > > > >> >>>> > > > >> You did not give more further different opinions, so I thought >>>> you >>>> > > have >>>> > > > >> agreed with the design after we promised to support two kinds >>>> of >>>> > > > >> implementation. >>>> > > > >> >>>> > > > >> In API level, we have answered your question about pass an >>>> > > > >> AggregateFunction to do the aggregation. No matter introduce >>>> > > localKeyBy >>>> > > > >> API >>>> > > > >> or not, we can support AggregateFunction. >>>> > > > >> >>>> > > > >> So what's your different opinion now? Can you share it with us? >>>> > > > >> >>>> > > > >> Best, >>>> > > > >> Vino >>>> > > > >> >>>> > > > >> Kurt Young <ykt...@gmail.com <mailto:ykt...@gmail.com>> >>>> 于2019年6月24日周一 下午4:24写道: >>>> > > > >> >>>> > > > >> > Hi vino, >>>> > > > >> > >>>> > > > >> > Sorry I don't see the consensus about reusing window >>>> operator and >>>> > > keep >>>> > > > >> the >>>> > > > >> > API design of localKeyBy. But I think we should definitely >>>> more >>>> > > > thoughts >>>> > > > >> > about this topic. >>>> > > > >> > >>>> > > > >> > I also try to loop in Stephan for this discussion. >>>> > > > >> > >>>> > > > >> > Best, >>>> > > > >> > Kurt >>>> > > > >> > >>>> > > > >> > >>>> > > > >> > On Mon, Jun 24, 2019 at 3:26 PM vino yang < >>>> yanghua1...@gmail.com <mailto:yanghua1...@gmail.com>> >>>> > > > >> wrote: >>>> > > > >> > >>>> > > > >> > > Hi all, >>>> > > > >> > > >>>> > > > >> > > I am happy we have a wonderful discussion and received many >>>> > > valuable >>>> > > > >> > > opinions in the last few days. >>>> > > > >> > > >>>> > > > >> > > Now, let me try to summarize what we have reached >>>> consensus about >>>> > > > the >>>> > > > >> > > changes in the design. >>>> > > > >> > > >>>> > > > >> > > - provide a unified abstraction to support two kinds of >>>> > > > >> > implementation; >>>> > > > >> > > - reuse WindowOperator and try to enhance it so that we >>>> can >>>> > > make >>>> > > > >> the >>>> > > > >> > > intermediate result of the local aggregation can be >>>> buffered >>>> > > and >>>> > > > >> > > flushed to >>>> > > > >> > > support two kinds of implementation; >>>> > > > >> > > - keep the API design of localKeyBy, but declare the >>>> disabled >>>> > > > some >>>> > > > >> > APIs >>>> > > > >> > > we cannot support currently, and provide a configurable >>>> API for >>>> > > > >> users >>>> > > > >> > to >>>> > > > >> > > choose how to handle intermediate result; >>>> > > > >> > > >>>> > > > >> > > The above three points have been updated in the design >>>> doc. Any >>>> > > > >> > > questions, please let me know. >>>> > > > >> > > >>>> > > > >> > > @Aljoscha Krettek <aljos...@apache.org <mailto: >>>> aljos...@apache.org>> What do you think? Any >>>> > > > >> further >>>> > > > >> > > comments? >>>> > > > >> > > >>>> > > > >> > > Best, >>>> > > > >> > > Vino >>>> > > > >> > > >>>> > > > >> > > vino yang <yanghua1...@gmail.com <mailto: >>>> yanghua1...@gmail.com>> 于2019年6月20日周四 下午2:02写道: >>>> > > > >> > > >>>> > > > >> > > > Hi Kurt, >>>> > > > >> > > > >>>> > > > >> > > > Thanks for your comments. >>>> > > > >> > > > >>>> > > > >> > > > It seems we come to a consensus that we should alleviate >>>> the >>>> > > > >> > performance >>>> > > > >> > > > degraded by data skew with local aggregation. In this >>>> FLIP, our >>>> > > > key >>>> > > > >> > > > solution is to introduce local keyed partition to >>>> achieve this >>>> > > > goal. >>>> > > > >> > > > >>>> > > > >> > > > I also agree that we can benefit a lot from the usage of >>>> > > > >> > > > AggregateFunction. In combination with localKeyBy, We >>>> can easily >>>> > > > >> use it >>>> > > > >> > > to >>>> > > > >> > > > achieve local aggregation: >>>> > > > >> > > > >>>> > > > >> > > > - input.localKeyBy(0).aggregate() >>>> > > > >> > > > - input.localKeyBy(0).window().aggregate() >>>> > > > >> > > > >>>> > > > >> > > > >>>> > > > >> > > > I think the only problem here is the choices between >>>> > > > >> > > > >>>> > > > >> > > > - (1) Introducing a new primitive called localKeyBy >>>> and >>>> > > > implement >>>> > > > >> > > > local aggregation with existing operators, or >>>> > > > >> > > > - (2) Introducing an operator called localAggregation >>>> which >>>> > > is >>>> > > > >> > > > composed of a key selector, a window-like operator, >>>> and an >>>> > > > >> aggregate >>>> > > > >> > > > function. >>>> > > > >> > > > >>>> > > > >> > > > >>>> > > > >> > > > There may exist some optimization opportunities by >>>> providing a >>>> > > > >> > composited >>>> > > > >> > > > interface for local aggregation. But at the same time, >>>> in my >>>> > > > >> opinion, >>>> > > > >> > we >>>> > > > >> > > > lose flexibility (Or we need certain efforts to achieve >>>> the same >>>> > > > >> > > > flexibility). >>>> > > > >> > > > >>>> > > > >> > > > As said in the previous mails, we have many use cases >>>> where the >>>> > > > >> > > > aggregation is very complicated and cannot be performed >>>> with >>>> > > > >> > > > AggregateFunction. For example, users may perform >>>> windowed >>>> > > > >> aggregations >>>> > > > >> > > > according to time, data values, or even external storage. >>>> > > > Typically, >>>> > > > >> > they >>>> > > > >> > > > now use KeyedProcessFunction or customized triggers to >>>> implement >>>> > > > >> these >>>> > > > >> > > > aggregations. It's not easy to address data skew in such >>>> cases >>>> > > > with >>>> > > > >> a >>>> > > > >> > > > composited interface for local aggregation. >>>> > > > >> > > > >>>> > > > >> > > > Given that Data Stream API is exactly targeted at these >>>> cases >>>> > > > where >>>> > > > >> the >>>> > > > >> > > > application logic is very complicated and optimization >>>> does not >>>> > > > >> > matter, I >>>> > > > >> > > > think it's a better choice to provide a relatively >>>> low-level and >>>> > > > >> > > canonical >>>> > > > >> > > > interface. >>>> > > > >> > > > >>>> > > > >> > > > The composited interface, on the other side, may be a >>>> good >>>> > > choice >>>> > > > in >>>> > > > >> > > > declarative interfaces, including SQL and Table API, as >>>> it >>>> > > allows >>>> > > > >> more >>>> > > > >> > > > optimization opportunities. >>>> > > > >> > > > >>>> > > > >> > > > Best, >>>> > > > >> > > > Vino >>>> > > > >> > > > >>>> > > > >> > > > >>>> > > > >> > > > Kurt Young <ykt...@gmail.com <mailto:ykt...@gmail.com>> >>>> 于2019年6月20日周四 上午10:15写道: >>>> > > > >> > > > >>>> > > > >> > > >> Hi all, >>>> > > > >> > > >> >>>> > > > >> > > >> As vino said in previous emails, I think we should first >>>> > > discuss >>>> > > > >> and >>>> > > > >> > > >> decide >>>> > > > >> > > >> what kind of use cases this FLIP want to >>>> > > > >> > > >> resolve, and what the API should look like. From my >>>> side, I >>>> > > think >>>> > > > >> this >>>> > > > >> > > is >>>> > > > >> > > >> probably the root cause of current divergence. >>>> > > > >> > > >> >>>> > > > >> > > >> My understand is (from the FLIP title and motivation >>>> section of >>>> > > > the >>>> > > > >> > > >> document), we want to have a proper support of >>>> > > > >> > > >> local aggregation, or pre aggregation. This is not a >>>> very new >>>> > > > idea, >>>> > > > >> > most >>>> > > > >> > > >> SQL engine already did this improvement. And >>>> > > > >> > > >> the core concept about this is, there should be an >>>> > > > >> AggregateFunction, >>>> > > > >> > no >>>> > > > >> > > >> matter it's a Flink runtime's AggregateFunction or >>>> > > > >> > > >> SQL's UserDefinedAggregateFunction. Both aggregation >>>> have >>>> > > concept >>>> > > > >> of >>>> > > > >> > > >> intermediate data type, sometimes we call it ACC. >>>> > > > >> > > >> I quickly went through the POC piotr did before [1], it >>>> also >>>> > > > >> directly >>>> > > > >> > > uses >>>> > > > >> > > >> AggregateFunction. >>>> > > > >> > > >> >>>> > > > >> > > >> But the thing is, after reading the design of this >>>> FLIP, I >>>> > > can't >>>> > > > >> help >>>> > > > >> > > >> myself feeling that this FLIP is not targeting to have >>>> a proper >>>> > > > >> > > >> local aggregation support. It actually want to introduce >>>> > > another >>>> > > > >> > > concept: >>>> > > > >> > > >> LocalKeyBy, and how to split and merge local key groups, >>>> > > > >> > > >> and how to properly support state on local key. Local >>>> > > aggregation >>>> > > > >> just >>>> > > > >> > > >> happened to be one possible use case of LocalKeyBy. >>>> > > > >> > > >> But it lacks supporting the essential concept of local >>>> > > > aggregation, >>>> > > > >> > > which >>>> > > > >> > > >> is intermediate data type. Without this, I really don't >>>> thing >>>> > > > >> > > >> it is a good fit of local aggregation. >>>> > > > >> > > >> >>>> > > > >> > > >> Here I want to make sure of the scope or the goal about >>>> this >>>> > > > FLIP, >>>> > > > >> do >>>> > > > >> > we >>>> > > > >> > > >> want to have a proper local aggregation engine, or we >>>> > > > >> > > >> just want to introduce a new concept called LocalKeyBy? >>>> > > > >> > > >> >>>> > > > >> > > >> [1]: https://github.com/apache/flink/pull/4626 < >>>> https://github.com/apache/flink/pull/4626> >>>> > > > >> > > >> >>>> > > > >> > > >> Best, >>>> > > > >> > > >> Kurt >>>> > > > >> > > >> >>>> > > > >> > > >> >>>> > > > >> > > >> On Wed, Jun 19, 2019 at 5:13 PM vino yang < >>>> > > yanghua1...@gmail.com <mailto:yanghua1...@gmail.com> >>>> > > > > >>>> > > > >> > > wrote: >>>> > > > >> > > >> >>>> > > > >> > > >> > Hi Hequn, >>>> > > > >> > > >> > >>>> > > > >> > > >> > Thanks for your comments! >>>> > > > >> > > >> > >>>> > > > >> > > >> > I agree that allowing local aggregation reusing >>>> window API >>>> > > and >>>> > > > >> > > refining >>>> > > > >> > > >> > window operator to make it match both requirements >>>> (come from >>>> > > > our >>>> > > > >> > and >>>> > > > >> > > >> Kurt) >>>> > > > >> > > >> > is a good decision! >>>> > > > >> > > >> > >>>> > > > >> > > >> > Concerning your questions: >>>> > > > >> > > >> > >>>> > > > >> > > >> > 1. The result of >>>> input.localKeyBy(0).sum(1).keyBy(0).sum(1) >>>> > > may >>>> > > > >> be >>>> > > > >> > > >> > meaningless. >>>> > > > >> > > >> > >>>> > > > >> > > >> > Yes, it does not make sense in most cases. However, I >>>> also >>>> > > want >>>> > > > >> to >>>> > > > >> > > note >>>> > > > >> > > >> > users should know the right semantics of localKeyBy >>>> and use >>>> > > it >>>> > > > >> > > >> correctly. >>>> > > > >> > > >> > Because this issue also exists for the global keyBy, >>>> consider >>>> > > > >> this >>>> > > > >> > > >> example: >>>> > > > >> > > >> > input.keyBy(0).sum(1).keyBy(0).sum(1), the result is >>>> also >>>> > > > >> > meaningless. >>>> > > > >> > > >> > >>>> > > > >> > > >> > 2. About the semantics of >>>> > > > >> > > >> > >>>> input1.localKeyBy(0).intervalJoin(input2.localKeyBy(1)). >>>> > > > >> > > >> > >>>> > > > >> > > >> > Good catch! I agree with you that it's not good to >>>> enable all >>>> > > > >> > > >> > functionalities for localKeyBy from KeyedStream. >>>> > > > >> > > >> > Currently, We do not support some APIs such as >>>> > > > >> > > >> > connect/join/intervalJoin/coGroup. This is due to >>>> that we >>>> > > force >>>> > > > >> the >>>> > > > >> > > >> > operators on LocalKeyedStreams chained with the >>>> inputs. >>>> > > > >> > > >> > >>>> > > > >> > > >> > Best, >>>> > > > >> > > >> > Vino >>>> > > > >> > > >> > >>>> > > > >> > > >> > >>>> > > > >> > > >> > Hequn Cheng <chenghe...@gmail.com <mailto: >>>> chenghe...@gmail.com>> 于2019年6月19日周三 下午3:42写道: >>>> > > > >> > > >> > >>>> > > > >> > > >> > > Hi, >>>> > > > >> > > >> > > >>>> > > > >> > > >> > > Thanks a lot for your great discussion and great to >>>> see >>>> > > that >>>> > > > >> some >>>> > > > >> > > >> > agreement >>>> > > > >> > > >> > > has been reached on the "local aggregate engine"! >>>> > > > >> > > >> > > >>>> > > > >> > > >> > > ===> Considering the abstract engine, >>>> > > > >> > > >> > > I'm thinking is it valuable for us to extend the >>>> current >>>> > > > >> window to >>>> > > > >> > > >> meet >>>> > > > >> > > >> > > both demands raised by Kurt and Vino? There are some >>>> > > benefits >>>> > > > >> we >>>> > > > >> > can >>>> > > > >> > > >> get: >>>> > > > >> > > >> > > >>>> > > > >> > > >> > > 1. The interfaces of the window are complete and >>>> clear. >>>> > > With >>>> > > > >> > > windows, >>>> > > > >> > > >> we >>>> > > > >> > > >> > > can define a lot of ways to split the data and >>>> perform >>>> > > > >> different >>>> > > > >> > > >> > > computations. >>>> > > > >> > > >> > > 2. We can also leverage the window to do miniBatch >>>> for the >>>> > > > >> global >>>> > > > >> > > >> > > aggregation, i.e, we can use the window to bundle >>>> data >>>> > > belong >>>> > > > >> to >>>> > > > >> > the >>>> > > > >> > > >> same >>>> > > > >> > > >> > > key, for every bundle we only need to read and >>>> write once >>>> > > > >> state. >>>> > > > >> > > This >>>> > > > >> > > >> can >>>> > > > >> > > >> > > greatly reduce state IO and improve performance. >>>> > > > >> > > >> > > 3. A lot of other use cases can also benefit from >>>> the >>>> > > window >>>> > > > >> base >>>> > > > >> > on >>>> > > > >> > > >> > memory >>>> > > > >> > > >> > > or stateless. >>>> > > > >> > > >> > > >>>> > > > >> > > >> > > ===> As for the API, >>>> > > > >> > > >> > > I think it is good to make our API more flexible. >>>> However, >>>> > > we >>>> > > > >> may >>>> > > > >> > > >> need to >>>> > > > >> > > >> > > make our API meaningful. >>>> > > > >> > > >> > > >>>> > > > >> > > >> > > Take my previous reply as an example, >>>> > > > >> > > >> > > input.localKeyBy(0).sum(1).keyBy(0).sum(1). The >>>> result may >>>> > > be >>>> > > > >> > > >> > meaningless. >>>> > > > >> > > >> > > Another example I find is the intervalJoin, e.g., >>>> > > > >> > > >> > > >>>> input1.localKeyBy(0).intervalJoin(input2.localKeyBy(1)). In >>>> > > > >> this >>>> > > > >> > > >> case, it >>>> > > > >> > > >> > > will bring problems if input1 and input2 share >>>> different >>>> > > > >> > > parallelism. >>>> > > > >> > > >> We >>>> > > > >> > > >> > > don't know which input should the join chained >>>> with? Even >>>> > > if >>>> > > > >> they >>>> > > > >> > > >> share >>>> > > > >> > > >> > the >>>> > > > >> > > >> > > same parallelism, it's hard to tell what the join >>>> is doing. >>>> > > > >> There >>>> > > > >> > > are >>>> > > > >> > > >> > maybe >>>> > > > >> > > >> > > some other problems. >>>> > > > >> > > >> > > >>>> > > > >> > > >> > > From this point of view, it's at least not good to >>>> enable >>>> > > all >>>> > > > >> > > >> > > functionalities for localKeyBy from KeyedStream? >>>> > > > >> > > >> > > >>>> > > > >> > > >> > > Great to also have your opinions. >>>> > > > >> > > >> > > >>>> > > > >> > > >> > > Best, Hequn >>>> > > > >> > > >> > > >>>> > > > >> > > >> > > >>>> > > > >> > > >> > > >>>> > > > >> > > >> > > >>>> > > > >> > > >> > > On Wed, Jun 19, 2019 at 10:24 AM vino yang < >>>> > > > >> yanghua1...@gmail.com <mailto:yanghua1...@gmail.com> >>>> > > > >> > > >>>> > > > >> > > >> > wrote: >>>> > > > >> > > >> > > >>>> > > > >> > > >> > > > Hi Kurt and Piotrek, >>>> > > > >> > > >> > > > >>>> > > > >> > > >> > > > Thanks for your comments. >>>> > > > >> > > >> > > > >>>> > > > >> > > >> > > > I agree that we can provide a better abstraction >>>> to be >>>> > > > >> > compatible >>>> > > > >> > > >> with >>>> > > > >> > > >> > > two >>>> > > > >> > > >> > > > different implementations. >>>> > > > >> > > >> > > > >>>> > > > >> > > >> > > > First of all, I think we should consider what >>>> kind of >>>> > > > >> scenarios >>>> > > > >> > we >>>> > > > >> > > >> need >>>> > > > >> > > >> > > to >>>> > > > >> > > >> > > > support in *API* level? >>>> > > > >> > > >> > > > >>>> > > > >> > > >> > > > We have some use cases which need to a customized >>>> > > > aggregation >>>> > > > >> > > >> through >>>> > > > >> > > >> > > > KeyedProcessFunction, (in the usage of our >>>> > > > localKeyBy.window >>>> > > > >> > they >>>> > > > >> > > >> can >>>> > > > >> > > >> > use >>>> > > > >> > > >> > > > ProcessWindowFunction). >>>> > > > >> > > >> > > > >>>> > > > >> > > >> > > > Shall we support these flexible use scenarios? >>>> > > > >> > > >> > > > >>>> > > > >> > > >> > > > Best, >>>> > > > >> > > >> > > > Vino >>>> > > > >> > > >> > > > >>>> > > > >> > > >> > > > Kurt Young <ykt...@gmail.com <mailto: >>>> ykt...@gmail.com>> 于2019年6月18日周二 下午8:37写道: >>>> > > > >> > > >> > > > >>>> > > > >> > > >> > > > > Hi Piotr, >>>> > > > >> > > >> > > > > >>>> > > > >> > > >> > > > > Thanks for joining the discussion. Make “local >>>> > > > aggregation" >>>> > > > >> > > >> abstract >>>> > > > >> > > >> > > > enough >>>> > > > >> > > >> > > > > sounds good to me, we could >>>> > > > >> > > >> > > > > implement and verify alternative solutions for >>>> use >>>> > > cases >>>> > > > of >>>> > > > >> > > local >>>> > > > >> > > >> > > > > aggregation. Maybe we will find both solutions >>>> > > > >> > > >> > > > > are appropriate for different scenarios. >>>> > > > >> > > >> > > > > >>>> > > > >> > > >> > > > > Starting from a simple one sounds a practical >>>> way to >>>> > > go. >>>> > > > >> What >>>> > > > >> > do >>>> > > > >> > > >> you >>>> > > > >> > > >> > > > think, >>>> > > > >> > > >> > > > > vino? >>>> > > > >> > > >> > > > > >>>> > > > >> > > >> > > > > Best, >>>> > > > >> > > >> > > > > Kurt >>>> > > > >> > > >> > > > > >>>> > > > >> > > >> > > > > >>>> > > > >> > > >> > > > > On Tue, Jun 18, 2019 at 8:10 PM Piotr Nowojski < >>>> > > > >> > > >> pi...@ververica.com <mailto:pi...@ververica.com>> >>>> > > > >> > > >> > > > > wrote: >>>> > > > >> > > >> > > > > >>>> > > > >> > > >> > > > > > Hi Kurt and Vino, >>>> > > > >> > > >> > > > > > >>>> > > > >> > > >> > > > > > I think there is a trade of hat we need to >>>> consider >>>> > > for >>>> > > > >> the >>>> > > > >> > > >> local >>>> > > > >> > > >> > > > > > aggregation. >>>> > > > >> > > >> > > > > > >>>> > > > >> > > >> > > > > > Generally speaking I would agree with Kurt >>>> about >>>> > > local >>>> > > > >> > > >> > > aggregation/pre >>>> > > > >> > > >> > > > > > aggregation not using Flink's state flush the >>>> > > operator >>>> > > > >> on a >>>> > > > >> > > >> > > checkpoint. >>>> > > > >> > > >> > > > > > Network IO is usually cheaper compared to >>>> Disks IO. >>>> > > > This >>>> > > > >> has >>>> > > > >> > > >> > however >>>> > > > >> > > >> > > > > couple >>>> > > > >> > > >> > > > > > of issues: >>>> > > > >> > > >> > > > > > 1. It can explode number of in-flight records >>>> during >>>> > > > >> > > checkpoint >>>> > > > >> > > >> > > barrier >>>> > > > >> > > >> > > > > > alignment, making checkpointing slower and >>>> decrease >>>> > > the >>>> > > > >> > actual >>>> > > > >> > > >> > > > > throughput. >>>> > > > >> > > >> > > > > > 2. This trades Disks IO on the local >>>> aggregation >>>> > > > machine >>>> > > > >> > with >>>> > > > >> > > >> CPU >>>> > > > >> > > >> > > (and >>>> > > > >> > > >> > > > > > Disks IO in case of RocksDB) on the final >>>> aggregation >>>> > > > >> > machine. >>>> > > > >> > > >> This >>>> > > > >> > > >> > > is >>>> > > > >> > > >> > > > > > fine, as long there is no huge data skew. If >>>> there is >>>> > > > >> only a >>>> > > > >> > > >> > handful >>>> > > > >> > > >> > > > (or >>>> > > > >> > > >> > > > > > even one single) hot keys, it might be better >>>> to keep >>>> > > > the >>>> > > > >> > > >> > persistent >>>> > > > >> > > >> > > > > state >>>> > > > >> > > >> > > > > > in the LocalAggregationOperator to offload >>>> final >>>> > > > >> aggregation >>>> > > > >> > > as >>>> > > > >> > > >> > much >>>> > > > >> > > >> > > as >>>> > > > >> > > >> > > > > > possible. >>>> > > > >> > > >> > > > > > 3. With frequent checkpointing local >>>> aggregation >>>> > > > >> > effectiveness >>>> > > > >> > > >> > would >>>> > > > >> > > >> > > > > > degrade. >>>> > > > >> > > >> > > > > > >>>> > > > >> > > >> > > > > > I assume Kurt is correct, that in your use >>>> cases >>>> > > > >> stateless >>>> > > > >> > > >> operator >>>> > > > >> > > >> > > was >>>> > > > >> > > >> > > > > > behaving better, but I could easily see other >>>> use >>>> > > cases >>>> > > > >> as >>>> > > > >> > > well. >>>> > > > >> > > >> > For >>>> > > > >> > > >> > > > > > example someone is already using RocksDB, and >>>> his job >>>> > > > is >>>> > > > >> > > >> > bottlenecked >>>> > > > >> > > >> > > > on >>>> > > > >> > > >> > > > > a >>>> > > > >> > > >> > > > > > single window operator instance because of >>>> the data >>>> > > > >> skew. In >>>> > > > >> > > >> that >>>> > > > >> > > >> > > case >>>> > > > >> > > >> > > > > > stateful local aggregation would be probably >>>> a better >>>> > > > >> > choice. >>>> > > > >> > > >> > > > > > >>>> > > > >> > > >> > > > > > Because of that, I think we should eventually >>>> provide >>>> > > > >> both >>>> > > > >> > > >> versions >>>> > > > >> > > >> > > and >>>> > > > >> > > >> > > > > in >>>> > > > >> > > >> > > > > > the initial version we should at least make >>>> the >>>> > > “local >>>> > > > >> > > >> aggregation >>>> > > > >> > > >> > > > > engine” >>>> > > > >> > > >> > > > > > abstract enough, that one could easily provide >>>> > > > different >>>> > > > >> > > >> > > implementation >>>> > > > >> > > >> > > > > > strategy. >>>> > > > >> > > >> > > > > > >>>> > > > >> > > >> > > > > > Piotrek >>>> > > > >> > > >> > > > > > >>>> > > > >> > > >> > > > > > > On 18 Jun 2019, at 11:46, Kurt Young < >>>> > > > ykt...@gmail.com <mailto:ykt...@gmail.com> >>>> > > > >> > >>>> > > > >> > > >> wrote: >>>> > > > >> > > >> > > > > > > >>>> > > > >> > > >> > > > > > > Hi, >>>> > > > >> > > >> > > > > > > >>>> > > > >> > > >> > > > > > > For the trigger, it depends on what >>>> operator we >>>> > > want >>>> > > > to >>>> > > > >> > use >>>> > > > >> > > >> under >>>> > > > >> > > >> > > the >>>> > > > >> > > >> > > > > > API. >>>> > > > >> > > >> > > > > > > If we choose to use window operator, >>>> > > > >> > > >> > > > > > > we should also use window's trigger. >>>> However, I >>>> > > also >>>> > > > >> think >>>> > > > >> > > >> reuse >>>> > > > >> > > >> > > > window >>>> > > > >> > > >> > > > > > > operator for this scenario may not be >>>> > > > >> > > >> > > > > > > the best choice. The reasons are the >>>> following: >>>> > > > >> > > >> > > > > > > >>>> > > > >> > > >> > > > > > > 1. As a lot of people already pointed out, >>>> window >>>> > > > >> relies >>>> > > > >> > > >> heavily >>>> > > > >> > > >> > on >>>> > > > >> > > >> > > > > state >>>> > > > >> > > >> > > > > > > and it will definitely effect performance. >>>> You can >>>> > > > >> > > >> > > > > > > argue that one can use heap based >>>> statebackend, but >>>> > > > >> this >>>> > > > >> > > will >>>> > > > >> > > >> > > > introduce >>>> > > > >> > > >> > > > > > > extra coupling. Especially we have a chance >>>> to >>>> > > > >> > > >> > > > > > > design a pure stateless operator. >>>> > > > >> > > >> > > > > > > 2. The window operator is *the most* >>>> complicated >>>> > > > >> operator >>>> > > > >> > > >> Flink >>>> > > > >> > > >> > > > > currently >>>> > > > >> > > >> > > > > > > have. Maybe we only need to pick a subset of >>>> > > > >> > > >> > > > > > > window operator to achieve the goal, but >>>> once the >>>> > > > user >>>> > > > >> > wants >>>> > > > >> > > >> to >>>> > > > >> > > >> > > have >>>> > > > >> > > >> > > > a >>>> > > > >> > > >> > > > > > deep >>>> > > > >> > > >> > > > > > > look at the localAggregation operator, it's >>>> still >>>> > > > >> > > >> > > > > > > hard to find out what's going on under the >>>> window >>>> > > > >> > operator. >>>> > > > >> > > >> For >>>> > > > >> > > >> > > > > > simplicity, >>>> > > > >> > > >> > > > > > > I would also recommend we introduce a >>>> dedicated >>>> > > > >> > > >> > > > > > > lightweight operator, which also much >>>> easier for a >>>> > > > >> user to >>>> > > > >> > > >> learn >>>> > > > >> > > >> > > and >>>> > > > >> > > >> > > > > use. >>>> > > > >> > > >> > > > > > > >>>> > > > >> > > >> > > > > > > For your question about increasing the >>>> burden in >>>> > > > >> > > >> > > > > > > >>>> `StreamOperator::prepareSnapshotPreBarrier()`, the >>>> > > > only >>>> > > > >> > > thing >>>> > > > >> > > >> > this >>>> > > > >> > > >> > > > > > function >>>> > > > >> > > >> > > > > > > need >>>> > > > >> > > >> > > > > > > to do is output all the partial results, >>>> it's >>>> > > purely >>>> > > > >> cpu >>>> > > > >> > > >> > workload, >>>> > > > >> > > >> > > > not >>>> > > > >> > > >> > > > > > > introducing any IO. I want to point out >>>> that even >>>> > > if >>>> > > > we >>>> > > > >> > have >>>> > > > >> > > >> this >>>> > > > >> > > >> > > > > > > cost, we reduced another barrier align cost >>>> of the >>>> > > > >> > operator, >>>> > > > >> > > >> > which >>>> > > > >> > > >> > > is >>>> > > > >> > > >> > > > > the >>>> > > > >> > > >> > > > > > > sync flush stage of the state, if you >>>> introduced >>>> > > > state. >>>> > > > >> > This >>>> > > > >> > > >> > > > > > > flush actually will introduce disk IO, and >>>> I think >>>> > > > it's >>>> > > > >> > > >> worthy to >>>> > > > >> > > >> > > > > > exchange >>>> > > > >> > > >> > > > > > > this cost with purely CPU workload. And we >>>> do have >>>> > > > some >>>> > > > >> > > >> > > > > > > observations about these two behavior (as i >>>> said >>>> > > > >> before, >>>> > > > >> > we >>>> > > > >> > > >> > > actually >>>> > > > >> > > >> > > > > > > implemented both solutions), the stateless >>>> one >>>> > > > actually >>>> > > > >> > > >> performs >>>> > > > >> > > >> > > > > > > better both in performance and barrier >>>> align time. >>>> > > > >> > > >> > > > > > > >>>> > > > >> > > >> > > > > > > Best, >>>> > > > >> > > >> > > > > > > Kurt >>>> > > > >> > > >> > > > > > > >>>> > > > >> > > >> > > > > > > >>>> > > > >> > > >> > > > > > > On Tue, Jun 18, 2019 at 3:40 PM vino yang < >>>> > > > >> > > >> yanghua1...@gmail.com <mailto:yanghua1...@gmail.com> >>>> > > > >> > > >> > > >>>> > > > >> > > >> > > > > wrote: >>>> > > > >> > > >> > > > > > > >>>> > > > >> > > >> > > > > > >> Hi Kurt, >>>> > > > >> > > >> > > > > > >> >>>> > > > >> > > >> > > > > > >> Thanks for your example. Now, it looks more >>>> > > clearly >>>> > > > >> for >>>> > > > >> > me. >>>> > > > >> > > >> > > > > > >> >>>> > > > >> > > >> > > > > > >> From your example code snippet, I saw the >>>> > > > >> localAggregate >>>> > > > >> > > API >>>> > > > >> > > >> has >>>> > > > >> > > >> > > > three >>>> > > > >> > > >> > > > > > >> parameters: >>>> > > > >> > > >> > > > > > >> >>>> > > > >> > > >> > > > > > >> 1. key field >>>> > > > >> > > >> > > > > > >> 2. PartitionAvg >>>> > > > >> > > >> > > > > > >> 3. CountTrigger: Does this trigger comes >>>> from >>>> > > > window >>>> > > > >> > > >> package? >>>> > > > >> > > >> > > > > > >> >>>> > > > >> > > >> > > > > > >> I will compare our and your design from >>>> API and >>>> > > > >> operator >>>> > > > >> > > >> level: >>>> > > > >> > > >> > > > > > >> >>>> > > > >> > > >> > > > > > >> *From the API level:* >>>> > > > >> > > >> > > > > > >> >>>> > > > >> > > >> > > > > > >> As I replied to @dianfu in the old email >>>> > > thread,[1] >>>> > > > >> the >>>> > > > >> > > >> Window >>>> > > > >> > > >> > API >>>> > > > >> > > >> > > > can >>>> > > > >> > > >> > > > > > >> provide the second and the third parameter >>>> right >>>> > > > now. >>>> > > > >> > > >> > > > > > >> >>>> > > > >> > > >> > > > > > >> If you reuse specified interface or class, >>>> such as >>>> > > > >> > > *Trigger* >>>> > > > >> > > >> or >>>> > > > >> > > >> > > > > > >> *CounterTrigger* provided by window >>>> package, but >>>> > > do >>>> > > > >> not >>>> > > > >> > use >>>> > > > >> > > >> > window >>>> > > > >> > > >> > > > > API, >>>> > > > >> > > >> > > > > > >> it's not reasonable. >>>> > > > >> > > >> > > > > > >> And if you do not reuse these interface or >>>> class, >>>> > > > you >>>> > > > >> > would >>>> > > > >> > > >> need >>>> > > > >> > > >> > > to >>>> > > > >> > > >> > > > > > >> introduce more things however they are >>>> looked >>>> > > > similar >>>> > > > >> to >>>> > > > >> > > the >>>> > > > >> > > >> > > things >>>> > > > >> > > >> > > > > > >> provided by window package. >>>> > > > >> > > >> > > > > > >> >>>> > > > >> > > >> > > > > > >> The window package has provided several >>>> types of >>>> > > the >>>> > > > >> > window >>>> > > > >> > > >> and >>>> > > > >> > > >> > > many >>>> > > > >> > > >> > > > > > >> triggers and let users customize it. >>>> What's more, >>>> > > > the >>>> > > > >> > user >>>> > > > >> > > is >>>> > > > >> > > >> > more >>>> > > > >> > > >> > > > > > familiar >>>> > > > >> > > >> > > > > > >> with Window API. >>>> > > > >> > > >> > > > > > >> >>>> > > > >> > > >> > > > > > >> This is the reason why we just provide >>>> localKeyBy >>>> > > > API >>>> > > > >> and >>>> > > > >> > > >> reuse >>>> > > > >> > > >> > > the >>>> > > > >> > > >> > > > > > window >>>> > > > >> > > >> > > > > > >> API. It reduces unnecessary components >>>> such as >>>> > > > >> triggers >>>> > > > >> > and >>>> > > > >> > > >> the >>>> > > > >> > > >> > > > > > mechanism >>>> > > > >> > > >> > > > > > >> of buffer (based on count num or time). >>>> > > > >> > > >> > > > > > >> And it has a clear and easy to understand >>>> > > semantics. >>>> > > > >> > > >> > > > > > >> >>>> > > > >> > > >> > > > > > >> *From the operator level:* >>>> > > > >> > > >> > > > > > >> >>>> > > > >> > > >> > > > > > >> We reused window operator, so we can get >>>> all the >>>> > > > >> benefits >>>> > > > >> > > >> from >>>> > > > >> > > >> > > state >>>> > > > >> > > >> > > > > and >>>> > > > >> > > >> > > > > > >> checkpoint. >>>> > > > >> > > >> > > > > > >> >>>> > > > >> > > >> > > > > > >> From your design, you named the operator >>>> under >>>> > > > >> > > localAggregate >>>> > > > >> > > >> > API >>>> > > > >> > > >> > > > is a >>>> > > > >> > > >> > > > > > >> *stateless* operator. IMO, it is still a >>>> state, it >>>> > > > is >>>> > > > >> > just >>>> > > > >> > > >> not >>>> > > > >> > > >> > > Flink >>>> > > > >> > > >> > > > > > >> managed state. >>>> > > > >> > > >> > > > > > >> About the memory buffer (I think it's >>>> still not >>>> > > very >>>> > > > >> > clear, >>>> > > > >> > > >> if >>>> > > > >> > > >> > you >>>> > > > >> > > >> > > > > have >>>> > > > >> > > >> > > > > > >> time, can you give more detail information >>>> or >>>> > > answer >>>> > > > >> my >>>> > > > >> > > >> > > questions), >>>> > > > >> > > >> > > > I >>>> > > > >> > > >> > > > > > have >>>> > > > >> > > >> > > > > > >> some questions: >>>> > > > >> > > >> > > > > > >> >>>> > > > >> > > >> > > > > > >> - if it just a raw JVM heap memory >>>> buffer, how >>>> > > to >>>> > > > >> > support >>>> > > > >> > > >> > fault >>>> > > > >> > > >> > > > > > >> tolerance, if the job is configured >>>> EXACTLY-ONCE >>>> > > > >> > semantic >>>> > > > >> > > >> > > > guarantee? >>>> > > > >> > > >> > > > > > >> - if you thought the memory >>>> buffer(non-Flink >>>> > > > state), >>>> > > > >> > has >>>> > > > >> > > >> > better >>>> > > > >> > > >> > > > > > >> performance. In our design, users can >>>> also >>>> > > config >>>> > > > >> HEAP >>>> > > > >> > > >> state >>>> > > > >> > > >> > > > backend >>>> > > > >> > > >> > > > > > to >>>> > > > >> > > >> > > > > > >> provide the performance close to your >>>> mechanism. >>>> > > > >> > > >> > > > > > >> - >>>> `StreamOperator::prepareSnapshotPreBarrier()` >>>> > > > >> related >>>> > > > >> > > to >>>> > > > >> > > >> the >>>> > > > >> > > >> > > > > timing >>>> > > > >> > > >> > > > > > of >>>> > > > >> > > >> > > > > > >> snapshot. IMO, the flush action should >>>> be a >>>> > > > >> > synchronized >>>> > > > >> > > >> > action? >>>> > > > >> > > >> > > > (if >>>> > > > >> > > >> > > > > > >> not, >>>> > > > >> > > >> > > > > > >> please point out my mistake) I still >>>> think we >>>> > > > should >>>> > > > >> > not >>>> > > > >> > > >> > depend >>>> > > > >> > > >> > > on >>>> > > > >> > > >> > > > > the >>>> > > > >> > > >> > > > > > >> timing of checkpoint. Checkpoint related >>>> > > > operations >>>> > > > >> are >>>> > > > >> > > >> > inherent >>>> > > > >> > > >> > > > > > >> performance sensitive, we should not >>>> increase >>>> > > its >>>> > > > >> > burden >>>> > > > >> > > >> > > anymore. >>>> > > > >> > > >> > > > > Our >>>> > > > >> > > >> > > > > > >> implementation based on the mechanism of >>>> Flink's >>>> > > > >> > > >> checkpoint, >>>> > > > >> > > >> > > which >>>> > > > >> > > >> > > > > can >>>> > > > >> > > >> > > > > > >> benefit from the asnyc snapshot and >>>> incremental >>>> > > > >> > > checkpoint. >>>> > > > >> > > >> > IMO, >>>> > > > >> > > >> > > > the >>>> > > > >> > > >> > > > > > >> performance is not a problem, and we >>>> also do not >>>> > > > >> find >>>> > > > >> > the >>>> > > > >> > > >> > > > > performance >>>> > > > >> > > >> > > > > > >> issue >>>> > > > >> > > >> > > > > > >> in our production. >>>> > > > >> > > >> > > > > > >> >>>> > > > >> > > >> > > > > > >> [1]: >>>> > > > >> > > >> > > > > > >> >>>> > > > >> > > >> > > > > > >> >>>> > > > >> > > >> > > > > > >>>> > > > >> > > >> > > > > >>>> > > > >> > > >> > > > >>>> > > > >> > > >> > > >>>> > > > >> > > >> > >>>> > > > >> > > >> >>>> > > > >> > > >>>> > > > >> > >>>> > > > >> >>>> > > > >>>> > > >>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308 >>>> < >>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308 >>>> > >>>> > > > >> > > >> > > > > > >> >>>> > > > >> > > >> > > > > > >> Best, >>>> > > > >> > > >> > > > > > >> Vino >>>> > > > >> > > >> > > > > > >> >>>> > > > >> > > >> > > > > > >> Kurt Young <ykt...@gmail.com <mailto: >>>> ykt...@gmail.com>> 于2019年6月18日周二 >>>> > > > 下午2:27写道: >>>> > > > >> > > >> > > > > > >> >>>> > > > >> > > >> > > > > > >>> Yeah, sorry for not expressing myself >>>> clearly. I >>>> > > > will >>>> > > > >> > try >>>> > > > >> > > to >>>> > > > >> > > >> > > > provide >>>> > > > >> > > >> > > > > > more >>>> > > > >> > > >> > > > > > >>> details to make sure we are on the same >>>> page. >>>> > > > >> > > >> > > > > > >>> >>>> > > > >> > > >> > > > > > >>> For DataStream API, it shouldn't be >>>> optimized >>>> > > > >> > > automatically. >>>> > > > >> > > >> > You >>>> > > > >> > > >> > > > have >>>> > > > >> > > >> > > > > > to >>>> > > > >> > > >> > > > > > >>> explicitly call API to do local >>>> aggregation >>>> > > > >> > > >> > > > > > >>> as well as the trigger policy of the local >>>> > > > >> aggregation. >>>> > > > >> > > Take >>>> > > > >> > > >> > > > average >>>> > > > >> > > >> > > > > > for >>>> > > > >> > > >> > > > > > >>> example, the user program may look like >>>> this >>>> > > (just >>>> > > > a >>>> > > > >> > > draft): >>>> > > > >> > > >> > > > > > >>> >>>> > > > >> > > >> > > > > > >>> assuming the input type is >>>> > > DataStream<Tupl2<String, >>>> > > > >> > Int>> >>>> > > > >> > > >> > > > > > >>> >>>> > > > >> > > >> > > > > > >>> ds.localAggregate( >>>> > > > >> > > >> > > > > > >>> 0, >>>> > > // >>>> > > > >> The >>>> > > > >> > > local >>>> > > > >> > > >> > key, >>>> > > > >> > > >> > > > > which >>>> > > > >> > > >> > > > > > >> is >>>> > > > >> > > >> > > > > > >>> the String from Tuple2 >>>> > > > >> > > >> > > > > > >>> PartitionAvg(1), >>>> // The >>>> > > > >> partial >>>> > > > >> > > >> > > aggregation >>>> > > > >> > > >> > > > > > >>> function, produces Tuple2<Long, Int>, >>>> indicating >>>> > > > sum >>>> > > > >> and >>>> > > > >> > > >> count >>>> > > > >> > > >> > > > > > >>> CountTrigger.of(1000L) // >>>> Trigger >>>> > > policy, >>>> > > > >> note >>>> > > > >> > > >> this >>>> > > > >> > > >> > > > should >>>> > > > >> > > >> > > > > be >>>> > > > >> > > >> > > > > > >>> best effort, and also be composited with >>>> time >>>> > > based >>>> > > > >> or >>>> > > > >> > > >> memory >>>> > > > >> > > >> > > size >>>> > > > >> > > >> > > > > > based >>>> > > > >> > > >> > > > > > >>> trigger >>>> > > > >> > > >> > > > > > >>> ) >>>> // >>>> > > > The >>>> > > > >> > > return >>>> > > > >> > > >> > type >>>> > > > >> > > >> > > > is >>>> > > > >> > > >> > > > > > >> local >>>> > > > >> > > >> > > > > > >>> aggregate Tuple2<String, Tupl2<Long, Int>> >>>> > > > >> > > >> > > > > > >>> .keyBy(0) >>>> // >>>> > > Further >>>> > > > >> > keyby >>>> > > > >> > > it >>>> > > > >> > > >> > with >>>> > > > >> > > >> > > > > > >> required >>>> > > > >> > > >> > > > > > >>> key >>>> > > > >> > > >> > > > > > >>> .aggregate(1) // >>>> This >>>> > > will >>>> > > > >> merge >>>> > > > >> > > all >>>> > > > >> > > >> > the >>>> > > > >> > > >> > > > > > partial >>>> > > > >> > > >> > > > > > >>> results and get the final average. >>>> > > > >> > > >> > > > > > >>> >>>> > > > >> > > >> > > > > > >>> (This is only a draft, only trying to >>>> explain >>>> > > what >>>> > > > it >>>> > > > >> > > looks >>>> > > > >> > > >> > > like. ) >>>> > > > >> > > >> > > > > > >>> >>>> > > > >> > > >> > > > > > >>> The local aggregate operator can be >>>> stateless, we >>>> > > > can >>>> > > > >> > > keep a >>>> > > > >> > > >> > > memory >>>> > > > >> > > >> > > > > > >> buffer >>>> > > > >> > > >> > > > > > >>> or other efficient data structure to >>>> improve the >>>> > > > >> > aggregate >>>> > > > >> > > >> > > > > performance. >>>> > > > >> > > >> > > > > > >>> >>>> > > > >> > > >> > > > > > >>> Let me know if you have any other >>>> questions. >>>> > > > >> > > >> > > > > > >>> >>>> > > > >> > > >> > > > > > >>> Best, >>>> > > > >> > > >> > > > > > >>> Kurt >>>> > > > >> > > >> > > > > > >>> >>>> > > > >> > > >> > > > > > >>> >>>> > > > >> > > >> > > > > > >>> On Tue, Jun 18, 2019 at 1:29 PM vino yang >>>> < >>>> > > > >> > > >> > yanghua1...@gmail.com <mailto:yanghua1...@gmail.com> >>>> > > > >> > > >> > > > >>>> > > > >> > > >> > > > > > wrote: >>>> > > > >> > > >> > > > > > >>> >>>> > > > >> > > >> > > > > > >>>> Hi Kurt, >>>> > > > >> > > >> > > > > > >>>> >>>> > > > >> > > >> > > > > > >>>> Thanks for your reply. >>>> > > > >> > > >> > > > > > >>>> >>>> > > > >> > > >> > > > > > >>>> Actually, I am not against you to raise >>>> your >>>> > > > design. >>>> > > > >> > > >> > > > > > >>>> >>>> > > > >> > > >> > > > > > >>>> From your description before, I just can >>>> imagine >>>> > > > >> your >>>> > > > >> > > >> > high-level >>>> > > > >> > > >> > > > > > >>>> implementation is about SQL and the >>>> optimization >>>> > > > is >>>> > > > >> > inner >>>> > > > >> > > >> of >>>> > > > >> > > >> > the >>>> > > > >> > > >> > > > > API. >>>> > > > >> > > >> > > > > > >> Is >>>> > > > >> > > >> > > > > > >>> it >>>> > > > >> > > >> > > > > > >>>> automatically? how to give the >>>> configuration >>>> > > > option >>>> > > > >> > about >>>> > > > >> > > >> > > trigger >>>> > > > >> > > >> > > > > > >>>> pre-aggregation? >>>> > > > >> > > >> > > > > > >>>> >>>> > > > >> > > >> > > > > > >>>> Maybe after I get more information, it >>>> sounds >>>> > > more >>>> > > > >> > > >> reasonable. >>>> > > > >> > > >> > > > > > >>>> >>>> > > > >> > > >> > > > > > >>>> IMO, first of all, it would be better to >>>> make >>>> > > your >>>> > > > >> user >>>> > > > >> > > >> > > interface >>>> > > > >> > > >> > > > > > >>> concrete, >>>> > > > >> > > >> > > > > > >>>> it's the basis of the discussion. >>>> > > > >> > > >> > > > > > >>>> >>>> > > > >> > > >> > > > > > >>>> For example, can you give an example code >>>> > > snippet >>>> > > > to >>>> > > > >> > > >> introduce >>>> > > > >> > > >> > > how >>>> > > > >> > > >> > > > > to >>>> > > > >> > > >> > > > > > >>> help >>>> > > > >> > > >> > > > > > >>>> users to process data skew caused by the >>>> jobs >>>> > > > which >>>> > > > >> > built >>>> > > > >> > > >> with >>>> > > > >> > > >> > > > > > >> DataStream >>>> > > > >> > > >> > > > > > >>>> API? >>>> > > > >> > > >> > > > > > >>>> >>>> > > > >> > > >> > > > > > >>>> If you give more details we can discuss >>>> further >>>> > > > >> more. I >>>> > > > >> > > >> think >>>> > > > >> > > >> > if >>>> > > > >> > > >> > > > one >>>> > > > >> > > >> > > > > > >>> design >>>> > > > >> > > >> > > > > > >>>> introduces an exact interface and >>>> another does >>>> > > > not. >>>> > > > >> > > >> > > > > > >>>> >>>> > > > >> > > >> > > > > > >>>> The implementation has an obvious >>>> difference. >>>> > > For >>>> > > > >> > > example, >>>> > > > >> > > >> we >>>> > > > >> > > >> > > > > > introduce >>>> > > > >> > > >> > > > > > >>> an >>>> > > > >> > > >> > > > > > >>>> exact API in DataStream named >>>> localKeyBy, about >>>> > > > the >>>> > > > >> > > >> > > > pre-aggregation >>>> > > > >> > > >> > > > > we >>>> > > > >> > > >> > > > > > >>> need >>>> > > > >> > > >> > > > > > >>>> to define the trigger mechanism of local >>>> > > > >> aggregation, >>>> > > > >> > so >>>> > > > >> > > we >>>> > > > >> > > >> > find >>>> > > > >> > > >> > > > > > reused >>>> > > > >> > > >> > > > > > >>>> window API and operator is a good >>>> choice. This >>>> > > is >>>> > > > a >>>> > > > >> > > >> reasoning >>>> > > > >> > > >> > > link >>>> > > > >> > > >> > > > > > from >>>> > > > >> > > >> > > > > > >>>> design to implementation. >>>> > > > >> > > >> > > > > > >>>> >>>> > > > >> > > >> > > > > > >>>> What do you think? >>>> > > > >> > > >> > > > > > >>>> >>>> > > > >> > > >> > > > > > >>>> Best, >>>> > > > >> > > >> > > > > > >>>> Vino >>>> > > > >> > > >> > > > > > >>>> >>>> > > > >> > > >> > > > > > >>>> >>>> > > > >> > > >> > > > > > >>>> Kurt Young <ykt...@gmail.com <mailto: >>>> ykt...@gmail.com>> 于2019年6月18日周二 >>>> > > > >> 上午11:58写道: >>>> > > > >> > > >> > > > > > >>>> >>>> > > > >> > > >> > > > > > >>>>> Hi Vino, >>>> > > > >> > > >> > > > > > >>>>> >>>> > > > >> > > >> > > > > > >>>>> Now I feel that we may have different >>>> > > > >> understandings >>>> > > > >> > > about >>>> > > > >> > > >> > what >>>> > > > >> > > >> > > > > kind >>>> > > > >> > > >> > > > > > >> of >>>> > > > >> > > >> > > > > > >>>>> problems or improvements you want to >>>> > > > >> > > >> > > > > > >>>>> resolve. Currently, most of the >>>> feedback are >>>> > > > >> focusing >>>> > > > >> > on >>>> > > > >> > > >> *how >>>> > > > >> > > >> > > to >>>> > > > >> > > >> > > > > do a >>>> > > > >> > > >> > > > > > >>>>> proper local aggregation to improve >>>> performance >>>> > > > >> > > >> > > > > > >>>>> and maybe solving the data skew issue*. >>>> And my >>>> > > > gut >>>> > > > >> > > >> feeling is >>>> > > > >> > > >> > > > this >>>> > > > >> > > >> > > > > is >>>> > > > >> > > >> > > > > > >>>>> exactly what users want at the first >>>> place, >>>> > > > >> > > >> > > > > > >>>>> especially those +1s. (Sorry to try to >>>> > > summarize >>>> > > > >> here, >>>> > > > >> > > >> please >>>> > > > >> > > >> > > > > correct >>>> > > > >> > > >> > > > > > >>> me >>>> > > > >> > > >> > > > > > >>>> if >>>> > > > >> > > >> > > > > > >>>>> i'm wrong). >>>> > > > >> > > >> > > > > > >>>>> >>>> > > > >> > > >> > > > > > >>>>> But I still think the design is somehow >>>> > > diverged >>>> > > > >> from >>>> > > > >> > > the >>>> > > > >> > > >> > goal. >>>> > > > >> > > >> > > > If >>>> > > > >> > > >> > > > > we >>>> > > > >> > > >> > > > > > >>>> want >>>> > > > >> > > >> > > > > > >>>>> to have an efficient and powerful way to >>>> > > > >> > > >> > > > > > >>>>> have local aggregation, supporting >>>> intermedia >>>> > > > >> result >>>> > > > >> > > type >>>> > > > >> > > >> is >>>> > > > >> > > >> > > > > > >> essential >>>> > > > >> > > >> > > > > > >>>> IMO. >>>> > > > >> > > >> > > > > > >>>>> Both runtime's `AggregateFunction` and >>>> > > > >> > > >> > > > > > >>>>> SQL`s `UserDefinedAggregateFunction` >>>> have a >>>> > > > proper >>>> > > > >> > > >> support of >>>> > > > >> > > >> > > > > > >>>> intermediate >>>> > > > >> > > >> > > > > > >>>>> result type and can do `merge` operation >>>> > > > >> > > >> > > > > > >>>>> on them. >>>> > > > >> > > >> > > > > > >>>>> >>>> > > > >> > > >> > > > > > >>>>> Now, we have a lightweight alternatives >>>> which >>>> > > > >> performs >>>> > > > >> > > >> well, >>>> > > > >> > > >> > > and >>>> > > > >> > > >> > > > > > >> have a >>>> > > > >> > > >> > > > > > >>>>> nice fit with the local aggregate >>>> requirements. >>>> > > > >> > > >> > > > > > >>>>> Mostly importantly, it's much less >>>> complex >>>> > > > because >>>> > > > >> > it's >>>> > > > >> > > >> > > > stateless. >>>> > > > >> > > >> > > > > > >> And >>>> > > > >> > > >> > > > > > >>>> it >>>> > > > >> > > >> > > > > > >>>>> can also achieve the similar >>>> > > multiple-aggregation >>>> > > > >> > > >> > > > > > >>>>> scenario. >>>> > > > >> > > >> > > > > > >>>>> >>>> > > > >> > > >> > > > > > >>>>> I still not convinced why we shouldn't >>>> consider >>>> > > > it >>>> > > > >> as >>>> > > > >> > a >>>> > > > >> > > >> first >>>> > > > >> > > >> > > > step. >>>> > > > >> > > >> > > > > > >>>>> >>>> > > > >> > > >> > > > > > >>>>> Best, >>>> > > > >> > > >> > > > > > >>>>> Kurt >>>> > > > >> > > >> > > > > > >>>>> >>>> > > > >> > > >> > > > > > >>>>> >>>> > > > >> > > >> > > > > > >>>>> On Tue, Jun 18, 2019 at 11:35 AM vino >>>> yang < >>>> > > > >> > > >> > > > yanghua1...@gmail.com <mailto: >>>> yanghua1...@gmail.com>> >>>> > > > >> > > >> > > > > > >>>> wrote: >>>> > > > >> > > >> > > > > > >>>>> >>>> > > > >> > > >> > > > > > >>>>>> Hi Kurt, >>>> > > > >> > > >> > > > > > >>>>>> >>>> > > > >> > > >> > > > > > >>>>>> Thanks for your comments. >>>> > > > >> > > >> > > > > > >>>>>> >>>> > > > >> > > >> > > > > > >>>>>> It seems we both implemented local >>>> aggregation >>>> > > > >> > feature >>>> > > > >> > > to >>>> > > > >> > > >> > > > optimize >>>> > > > >> > > >> > > > > > >>> the >>>> > > > >> > > >> > > > > > >>>>>> issue of data skew. >>>> > > > >> > > >> > > > > > >>>>>> However, IMHO, the API level of >>>> optimizing >>>> > > > >> revenue is >>>> > > > >> > > >> > > different. >>>> > > > >> > > >> > > > > > >>>>>> >>>> > > > >> > > >> > > > > > >>>>>> *Your optimization benefits from Flink >>>> SQL and >>>> > > > >> it's >>>> > > > >> > not >>>> > > > >> > > >> > user's >>>> > > > >> > > >> > > > > > >>>> faces.(If >>>> > > > >> > > >> > > > > > >>>>> I >>>> > > > >> > > >> > > > > > >>>>>> understand it incorrectly, please >>>> correct >>>> > > > this.)* >>>> > > > >> > > >> > > > > > >>>>>> *Our implementation employs it as an >>>> > > > optimization >>>> > > > >> > tool >>>> > > > >> > > >> API >>>> > > > >> > > >> > for >>>> > > > >> > > >> > > > > > >>>>> DataStream, >>>> > > > >> > > >> > > > > > >>>>>> it just like a local version of the >>>> keyBy >>>> > > API.* >>>> > > > >> > > >> > > > > > >>>>>> >>>> > > > >> > > >> > > > > > >>>>>> Based on this, I want to say support >>>> it as a >>>> > > > >> > DataStream >>>> > > > >> > > >> API >>>> > > > >> > > >> > > can >>>> > > > >> > > >> > > > > > >>> provide >>>> > > > >> > > >> > > > > > >>>>>> these advantages: >>>> > > > >> > > >> > > > > > >>>>>> >>>> > > > >> > > >> > > > > > >>>>>> >>>> > > > >> > > >> > > > > > >>>>>> - The localKeyBy API has a clear >>>> semantic >>>> > > and >>>> > > > >> it's >>>> > > > >> > > >> > flexible >>>> > > > >> > > >> > > > not >>>> > > > >> > > >> > > > > > >>> only >>>> > > > >> > > >> > > > > > >>>>> for >>>> > > > >> > > >> > > > > > >>>>>> processing data skew but also for >>>> > > implementing >>>> > > > >> some >>>> > > > >> > > >> user >>>> > > > >> > > >> > > > cases, >>>> > > > >> > > >> > > > > > >>> for >>>> > > > >> > > >> > > > > > >>>>>> example, if we want to calculate the >>>> > > > >> multiple-level >>>> > > > >> > > >> > > > aggregation, >>>> > > > >> > > >> > > > > > >>> we >>>> > > > >> > > >> > > > > > >>>>> can >>>> > > > >> > > >> > > > > > >>>>>> do >>>> > > > >> > > >> > > > > > >>>>>> multiple-level aggregation in the >>>> local >>>> > > > >> > aggregation: >>>> > > > >> > > >> > > > > > >>>>>> >>>> > > > >> > > input.localKeyBy("a").sum(1).localKeyBy("b").window(); >>>> > > > >> > > >> // >>>> > > > >> > > >> > > here >>>> > > > >> > > >> > > > > > >> "a" >>>> > > > >> > > >> > > > > > >>>> is >>>> > > > >> > > >> > > > > > >>>>> a >>>> > > > >> > > >> > > > > > >>>>>> sub-category, while "b" is a >>>> category, here >>>> > > we >>>> > > > >> do >>>> > > > >> > not >>>> > > > >> > > >> need >>>> > > > >> > > >> > > to >>>> > > > >> > > >> > > > > > >>>> shuffle >>>> > > > >> > > >> > > > > > >>>>>> data >>>> > > > >> > > >> > > > > > >>>>>> in the network. >>>> > > > >> > > >> > > > > > >>>>>> - The users of DataStream API will >>>> benefit >>>> > > > from >>>> > > > >> > this. >>>> > > > >> > > >> > > > Actually, >>>> > > > >> > > >> > > > > > >> we >>>> > > > >> > > >> > > > > > >>>>> have >>>> > > > >> > > >> > > > > > >>>>>> a lot of scenes need to use >>>> DataStream API. >>>> > > > >> > > Currently, >>>> > > > >> > > >> > > > > > >> DataStream >>>> > > > >> > > >> > > > > > >>>> API >>>> > > > >> > > >> > > > > > >>>>> is >>>> > > > >> > > >> > > > > > >>>>>> the cornerstone of the physical plan >>>> of >>>> > > Flink >>>> > > > >> SQL. >>>> > > > >> > > >> With a >>>> > > > >> > > >> > > > > > >>> localKeyBy >>>> > > > >> > > >> > > > > > >>>>>> API, >>>> > > > >> > > >> > > > > > >>>>>> the optimization of SQL at least may >>>> use >>>> > > this >>>> > > > >> > > optimized >>>> > > > >> > > >> > API, >>>> > > > >> > > >> > > > > > >> this >>>> > > > >> > > >> > > > > > >>>> is a >>>> > > > >> > > >> > > > > > >>>>>> further topic. >>>> > > > >> > > >> > > > > > >>>>>> - Based on the window operator, our >>>> state >>>> > > > would >>>> > > > >> > > benefit >>>> > > > >> > > >> > from >>>> > > > >> > > >> > > > > > >> Flink >>>> > > > >> > > >> > > > > > >>>>> State >>>> > > > >> > > >> > > > > > >>>>>> and checkpoint, we do not need to >>>> worry >>>> > > about >>>> > > > >> OOM >>>> > > > >> > and >>>> > > > >> > > >> job >>>> > > > >> > > >> > > > > > >> failed. >>>> > > > >> > > >> > > > > > >>>>>> >>>> > > > >> > > >> > > > > > >>>>>> Now, about your questions: >>>> > > > >> > > >> > > > > > >>>>>> >>>> > > > >> > > >> > > > > > >>>>>> 1. About our design cannot change the >>>> data >>>> > > type >>>> > > > >> and >>>> > > > >> > > about >>>> > > > >> > > >> > the >>>> > > > >> > > >> > > > > > >>>>>> implementation of average: >>>> > > > >> > > >> > > > > > >>>>>> >>>> > > > >> > > >> > > > > > >>>>>> Just like my reply to Hequn, the >>>> localKeyBy is >>>> > > > an >>>> > > > >> API >>>> > > > >> > > >> > provides >>>> > > > >> > > >> > > > to >>>> > > > >> > > >> > > > > > >> the >>>> > > > >> > > >> > > > > > >>>>> users >>>> > > > >> > > >> > > > > > >>>>>> who use DataStream API to build their >>>> jobs. >>>> > > > >> > > >> > > > > > >>>>>> Users should know its semantics and the >>>> > > > difference >>>> > > > >> > with >>>> > > > >> > > >> > keyBy >>>> > > > >> > > >> > > > API, >>>> > > > >> > > >> > > > > > >> so >>>> > > > >> > > >> > > > > > >>>> if >>>> > > > >> > > >> > > > > > >>>>>> they want to the average aggregation, >>>> they >>>> > > > should >>>> > > > >> > carry >>>> > > > >> > > >> > local >>>> > > > >> > > >> > > > sum >>>> > > > >> > > >> > > > > > >>>> result >>>> > > > >> > > >> > > > > > >>>>>> and local count result. >>>> > > > >> > > >> > > > > > >>>>>> I admit that it will be convenient to >>>> use >>>> > > keyBy >>>> > > > >> > > directly. >>>> > > > >> > > >> > But >>>> > > > >> > > >> > > we >>>> > > > >> > > >> > > > > > >> need >>>> > > > >> > > >> > > > > > >>>> to >>>> > > > >> > > >> > > > > > >>>>>> pay a little price when we get some >>>> benefits. >>>> > > I >>>> > > > >> think >>>> > > > >> > > >> this >>>> > > > >> > > >> > > price >>>> > > > >> > > >> > > > > is >>>> > > > >> > > >> > > > > > >>>>>> reasonable. Considering that the >>>> DataStream >>>> > > API >>>> > > > >> > itself >>>> > > > >> > > >> is a >>>> > > > >> > > >> > > > > > >> low-level >>>> > > > >> > > >> > > > > > >>>> API >>>> > > > >> > > >> > > > > > >>>>>> (at least for now). >>>> > > > >> > > >> > > > > > >>>>>> >>>> > > > >> > > >> > > > > > >>>>>> 2. About stateless operator and >>>> > > > >> > > >> > > > > > >>>>>> >>>> `StreamOperator::prepareSnapshotPreBarrier()`: >>>> > > > >> > > >> > > > > > >>>>>> >>>> > > > >> > > >> > > > > > >>>>>> Actually, I have discussed this >>>> opinion with >>>> > > > >> @dianfu >>>> > > > >> > in >>>> > > > >> > > >> the >>>> > > > >> > > >> > > old >>>> > > > >> > > >> > > > > > >> mail >>>> > > > >> > > >> > > > > > >>>>>> thread. I will copy my opinion from >>>> there: >>>> > > > >> > > >> > > > > > >>>>>> >>>> > > > >> > > >> > > > > > >>>>>> - for your design, you still need >>>> somewhere >>>> > > to >>>> > > > >> give >>>> > > > >> > > the >>>> > > > >> > > >> > > users >>>> > > > >> > > >> > > > > > >>>>> configure >>>> > > > >> > > >> > > > > > >>>>>> the trigger threshold (maybe memory >>>> > > > >> availability?), >>>> > > > >> > > >> this >>>> > > > >> > > >> > > > design >>>> > > > >> > > >> > > > > > >>>> cannot >>>> > > > >> > > >> > > > > > >>>>>> guarantee a deterministic semantics >>>> (it will >>>> > > > >> bring >>>> > > > >> > > >> trouble >>>> > > > >> > > >> > > for >>>> > > > >> > > >> > > > > > >>>> testing >>>> > > > >> > > >> > > > > > >>>>>> and >>>> > > > >> > > >> > > > > > >>>>>> debugging). >>>> > > > >> > > >> > > > > > >>>>>> - if the implementation depends on >>>> the >>>> > > timing >>>> > > > of >>>> > > > >> > > >> > checkpoint, >>>> > > > >> > > >> > > > it >>>> > > > >> > > >> > > > > > >>>> would >>>> > > > >> > > >> > > > > > >>>>>> affect the checkpoint's progress, >>>> and the >>>> > > > >> buffered >>>> > > > >> > > data >>>> > > > >> > > >> > may >>>> > > > >> > > >> > > > > > >> cause >>>> > > > >> > > >> > > > > > >>>> OOM >>>> > > > >> > > >> > > > > > >>>>>> issue. In addition, if the operator >>>> is >>>> > > > >> stateless, >>>> > > > >> > it >>>> > > > >> > > >> can >>>> > > > >> > > >> > not >>>> > > > >> > > >> > > > > > >>> provide >>>> > > > >> > > >> > > > > > >>>>>> fault >>>> > > > >> > > >> > > > > > >>>>>> tolerance. >>>> > > > >> > > >> > > > > > >>>>>> >>>> > > > >> > > >> > > > > > >>>>>> Best, >>>> > > > >> > > >> > > > > > >>>>>> Vino >>>> > > > >> > > >> > > > > > >>>>>> >>>> > > > >> > > >> > > > > > >>>>>> Kurt Young <ykt...@gmail.com <mailto: >>>> ykt...@gmail.com>> 于2019年6月18日周二 >>>> > > > >> > 上午9:22写道: >>>> > > > >> > > >> > > > > > >>>>>> >>>> > > > >> > > >> > > > > > >>>>>>> Hi Vino, >>>> > > > >> > > >> > > > > > >>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>> Thanks for the proposal, I like the >>>> general >>>> > > > idea >>>> > > > >> and >>>> > > > >> > > IMO >>>> > > > >> > > >> > it's >>>> > > > >> > > >> > > > > > >> very >>>> > > > >> > > >> > > > > > >>>>> useful >>>> > > > >> > > >> > > > > > >>>>>>> feature. >>>> > > > >> > > >> > > > > > >>>>>>> But after reading through the >>>> document, I >>>> > > feel >>>> > > > >> that >>>> > > > >> > we >>>> > > > >> > > >> may >>>> > > > >> > > >> > > over >>>> > > > >> > > >> > > > > > >>>> design >>>> > > > >> > > >> > > > > > >>>>>> the >>>> > > > >> > > >> > > > > > >>>>>>> required >>>> > > > >> > > >> > > > > > >>>>>>> operator for proper local >>>> aggregation. The >>>> > > main >>>> > > > >> > reason >>>> > > > >> > > >> is >>>> > > > >> > > >> > we >>>> > > > >> > > >> > > > want >>>> > > > >> > > >> > > > > > >>> to >>>> > > > >> > > >> > > > > > >>>>>> have a >>>> > > > >> > > >> > > > > > >>>>>>> clear definition and behavior about >>>> the >>>> > > "local >>>> > > > >> keyed >>>> > > > >> > > >> state" >>>> > > > >> > > >> > > > which >>>> > > > >> > > >> > > > > > >>> in >>>> > > > >> > > >> > > > > > >>>> my >>>> > > > >> > > >> > > > > > >>>>>>> opinion is not >>>> > > > >> > > >> > > > > > >>>>>>> necessary for local aggregation, at >>>> least for >>>> > > > >> start. >>>> > > > >> > > >> > > > > > >>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>> Another issue I noticed is the local >>>> key by >>>> > > > >> operator >>>> > > > >> > > >> cannot >>>> > > > >> > > >> > > > > > >> change >>>> > > > >> > > >> > > > > > >>>>>> element >>>> > > > >> > > >> > > > > > >>>>>>> type, it will >>>> > > > >> > > >> > > > > > >>>>>>> also restrict a lot of use cases >>>> which can be >>>> > > > >> > benefit >>>> > > > >> > > >> from >>>> > > > >> > > >> > > > local >>>> > > > >> > > >> > > > > > >>>>>>> aggregation, like "average". >>>> > > > >> > > >> > > > > > >>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>> We also did similar logic in SQL and >>>> the only >>>> > > > >> thing >>>> > > > >> > > >> need to >>>> > > > >> > > >> > > be >>>> > > > >> > > >> > > > > > >> done >>>> > > > >> > > >> > > > > > >>>> is >>>> > > > >> > > >> > > > > > >>>>>>> introduce >>>> > > > >> > > >> > > > > > >>>>>>> a stateless lightweight operator >>>> which is >>>> > > > >> *chained* >>>> > > > >> > > >> before >>>> > > > >> > > >> > > > > > >>> `keyby()`. >>>> > > > >> > > >> > > > > > >>>>> The >>>> > > > >> > > >> > > > > > >>>>>>> operator will flush all buffered >>>> > > > >> > > >> > > > > > >>>>>>> elements during >>>> > > > >> > > >> > `StreamOperator::prepareSnapshotPreBarrier()` >>>> > > > >> > > >> > > > and >>>> > > > >> > > >> > > > > > >>>> make >>>> > > > >> > > >> > > > > > >>>>>>> himself stateless. >>>> > > > >> > > >> > > > > > >>>>>>> By the way, in the earlier version we >>>> also >>>> > > did >>>> > > > >> the >>>> > > > >> > > >> similar >>>> > > > >> > > >> > > > > > >> approach >>>> > > > >> > > >> > > > > > >>>> by >>>> > > > >> > > >> > > > > > >>>>>>> introducing a stateful >>>> > > > >> > > >> > > > > > >>>>>>> local aggregation operator but it's >>>> not >>>> > > > >> performed as >>>> > > > >> > > >> well >>>> > > > >> > > >> > as >>>> > > > >> > > >> > > > the >>>> > > > >> > > >> > > > > > >>>> later >>>> > > > >> > > >> > > > > > >>>>>> one, >>>> > > > >> > > >> > > > > > >>>>>>> and also effect the barrie >>>> > > > >> > > >> > > > > > >>>>>>> alignment time. The later one is >>>> fairly >>>> > > simple >>>> > > > >> and >>>> > > > >> > > more >>>> > > > >> > > >> > > > > > >> efficient. >>>> > > > >> > > >> > > > > > >>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>> I would highly suggest you to >>>> consider to >>>> > > have >>>> > > > a >>>> > > > >> > > >> stateless >>>> > > > >> > > >> > > > > > >> approach >>>> > > > >> > > >> > > > > > >>>> at >>>> > > > >> > > >> > > > > > >>>>>> the >>>> > > > >> > > >> > > > > > >>>>>>> first step. >>>> > > > >> > > >> > > > > > >>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>> Best, >>>> > > > >> > > >> > > > > > >>>>>>> Kurt >>>> > > > >> > > >> > > > > > >>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>> On Mon, Jun 17, 2019 at 7:32 PM Jark >>>> Wu < >>>> > > > >> > > >> imj...@gmail.com <mailto:imj...@gmail.com>> >>>> > > > >> > > >> > > > > > >> wrote: >>>> > > > >> > > >> > > > > > >>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>> Hi Vino, >>>> > > > >> > > >> > > > > > >>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>> Thanks for the proposal. >>>> > > > >> > > >> > > > > > >>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>> Regarding to the >>>> "input.keyBy(0).sum(1)" vs >>>> > > > >> > > >> > > > > > >>>>>>>> >>>> > > > >> > > >> > >>>> "input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)", >>>> > > > >> > > >> > > > > > >> have >>>> > > > >> > > >> > > > > > >>>> you >>>> > > > >> > > >> > > > > > >>>>>>> done >>>> > > > >> > > >> > > > > > >>>>>>>> some benchmark? >>>> > > > >> > > >> > > > > > >>>>>>>> Because I'm curious about how much >>>> > > performance >>>> > > > >> > > >> improvement >>>> > > > >> > > >> > > can >>>> > > > >> > > >> > > > > > >> we >>>> > > > >> > > >> > > > > > >>>> get >>>> > > > >> > > >> > > > > > >>>>>> by >>>> > > > >> > > >> > > > > > >>>>>>>> using count window as the local >>>> operator. >>>> > > > >> > > >> > > > > > >>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>> Best, >>>> > > > >> > > >> > > > > > >>>>>>>> Jark >>>> > > > >> > > >> > > > > > >>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>> On Mon, 17 Jun 2019 at 17:48, vino >>>> yang < >>>> > > > >> > > >> > > > yanghua1...@gmail.com <mailto: >>>> yanghua1...@gmail.com> >>>> > > > >> > > >> > > > > > >>> >>>> > > > >> > > >> > > > > > >>>>> wrote: >>>> > > > >> > > >> > > > > > >>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>> Hi Hequn, >>>> > > > >> > > >> > > > > > >>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>> Thanks for your reply. >>>> > > > >> > > >> > > > > > >>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>> The purpose of localKeyBy API is to >>>> > > provide a >>>> > > > >> tool >>>> > > > >> > > >> which >>>> > > > >> > > >> > > can >>>> > > > >> > > >> > > > > > >>> let >>>> > > > >> > > >> > > > > > >>>>>> users >>>> > > > >> > > >> > > > > > >>>>>>> do >>>> > > > >> > > >> > > > > > >>>>>>>>> pre-aggregation in the local. The >>>> behavior >>>> > > of >>>> > > > >> the >>>> > > > >> > > >> > > > > > >>> pre-aggregation >>>> > > > >> > > >> > > > > > >>>>> is >>>> > > > >> > > >> > > > > > >>>>>>>>> similar to keyBy API. >>>> > > > >> > > >> > > > > > >>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>> So the three cases are different, I >>>> will >>>> > > > >> describe >>>> > > > >> > > them >>>> > > > >> > > >> > one >>>> > > > >> > > >> > > by >>>> > > > >> > > >> > > > > > >>>> one: >>>> > > > >> > > >> > > > > > >>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>> 1. input.keyBy(0).sum(1) >>>> > > > >> > > >> > > > > > >>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>> *In this case, the result is >>>> event-driven, >>>> > > > each >>>> > > > >> > > event >>>> > > > >> > > >> can >>>> > > > >> > > >> > > > > > >>> produce >>>> > > > >> > > >> > > > > > >>>>> one >>>> > > > >> > > >> > > > > > >>>>>>> sum >>>> > > > >> > > >> > > > > > >>>>>>>>> aggregation result and it is the >>>> latest one >>>> > > > >> from >>>> > > > >> > the >>>> > > > >> > > >> > source >>>> > > > >> > > >> > > > > > >>>> start.* >>>> > > > >> > > >> > > > > > >>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>> 2. >>>> > > input.localKeyBy(0).sum(1).keyBy(0).sum(1) >>>> > > > >> > > >> > > > > > >>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>> *In this case, the semantic may >>>> have a >>>> > > > >> problem, it >>>> > > > >> > > >> would >>>> > > > >> > > >> > do >>>> > > > >> > > >> > > > > > >> the >>>> > > > >> > > >> > > > > > >>>>> local >>>> > > > >> > > >> > > > > > >>>>>>> sum >>>> > > > >> > > >> > > > > > >>>>>>>>> aggregation and will produce the >>>> latest >>>> > > > partial >>>> > > > >> > > result >>>> > > > >> > > >> > from >>>> > > > >> > > >> > > > > > >> the >>>> > > > >> > > >> > > > > > >>>>>> source >>>> > > > >> > > >> > > > > > >>>>>>>>> start for every event. * >>>> > > > >> > > >> > > > > > >>>>>>>>> *These latest partial results from >>>> the same >>>> > > > key >>>> > > > >> > are >>>> > > > >> > > >> > hashed >>>> > > > >> > > >> > > to >>>> > > > >> > > >> > > > > > >>> one >>>> > > > >> > > >> > > > > > >>>>>> node >>>> > > > >> > > >> > > > > > >>>>>>> to >>>> > > > >> > > >> > > > > > >>>>>>>>> do the global sum aggregation.* >>>> > > > >> > > >> > > > > > >>>>>>>>> *In the global aggregation, when it >>>> > > received >>>> > > > >> > > multiple >>>> > > > >> > > >> > > partial >>>> > > > >> > > >> > > > > > >>>>> results >>>> > > > >> > > >> > > > > > >>>>>>>> (they >>>> > > > >> > > >> > > > > > >>>>>>>>> are all calculated from the source >>>> start) >>>> > > and >>>> > > > >> sum >>>> > > > >> > > them >>>> > > > >> > > >> > will >>>> > > > >> > > >> > > > > > >> get >>>> > > > >> > > >> > > > > > >>>> the >>>> > > > >> > > >> > > > > > >>>>>>> wrong >>>> > > > >> > > >> > > > > > >>>>>>>>> result.* >>>> > > > >> > > >> > > > > > >>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>> 3. >>>> > > > >> > > >> > > >>>> input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1) >>>> > > > >> > > >> > > > > > >>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>> *In this case, it would just get a >>>> partial >>>> > > > >> > > aggregation >>>> > > > >> > > >> > > result >>>> > > > >> > > >> > > > > > >>> for >>>> > > > >> > > >> > > > > > >>>>>> the 5 >>>> > > > >> > > >> > > > > > >>>>>>>>> records in the count window. The >>>> partial >>>> > > > >> > aggregation >>>> > > > >> > > >> > > results >>>> > > > >> > > >> > > > > > >>> from >>>> > > > >> > > >> > > > > > >>>>> the >>>> > > > >> > > >> > > > > > >>>>>>>> same >>>> > > > >> > > >> > > > > > >>>>>>>>> key will be aggregated globally.* >>>> > > > >> > > >> > > > > > >>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>> So the first case and the third >>>> case can >>>> > > get >>>> > > > >> the >>>> > > > >> > > >> *same* >>>> > > > >> > > >> > > > > > >> result, >>>> > > > >> > > >> > > > > > >>>> the >>>> > > > >> > > >> > > > > > >>>>>>>>> difference is the output-style and >>>> the >>>> > > > latency. >>>> > > > >> > > >> > > > > > >>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>> Generally speaking, the local key >>>> API is >>>> > > just >>>> > > > >> an >>>> > > > >> > > >> > > optimization >>>> > > > >> > > >> > > > > > >>>> API. >>>> > > > >> > > >> > > > > > >>>>> We >>>> > > > >> > > >> > > > > > >>>>>>> do >>>> > > > >> > > >> > > > > > >>>>>>>>> not limit the user's usage, but the >>>> user >>>> > > has >>>> > > > to >>>> > > > >> > > >> > understand >>>> > > > >> > > >> > > > > > >> its >>>> > > > >> > > >> > > > > > >>>>>>> semantics >>>> > > > >> > > >> > > > > > >>>>>>>>> and use it correctly. >>>> > > > >> > > >> > > > > > >>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>> Best, >>>> > > > >> > > >> > > > > > >>>>>>>>> Vino >>>> > > > >> > > >> > > > > > >>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>> Hequn Cheng <chenghe...@gmail.com >>>> <mailto:chenghe...@gmail.com>> >>>> > > > >> 于2019年6月17日周一 >>>> > > > >> > > >> > 下午4:18写道: >>>> > > > >> > > >> > > > > > >>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>> Hi Vino, >>>> > > > >> > > >> > > > > > >>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>> Thanks for the proposal, I think >>>> it is a >>>> > > > very >>>> > > > >> > good >>>> > > > >> > > >> > > feature! >>>> > > > >> > > >> > > > > > >>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>> One thing I want to make sure is >>>> the >>>> > > > semantics >>>> > > > >> > for >>>> > > > >> > > >> the >>>> > > > >> > > >> > > > > > >>>>>> `localKeyBy`. >>>> > > > >> > > >> > > > > > >>>>>>>> From >>>> > > > >> > > >> > > > > > >>>>>>>>>> the document, the `localKeyBy` API >>>> returns >>>> > > > an >>>> > > > >> > > >> instance >>>> > > > >> > > >> > of >>>> > > > >> > > >> > > > > > >>>>>>> `KeyedStream` >>>> > > > >> > > >> > > > > > >>>>>>>>>> which can also perform sum(), so >>>> in this >>>> > > > case, >>>> > > > >> > > what's >>>> > > > >> > > >> > the >>>> > > > >> > > >> > > > > > >>>>> semantics >>>> > > > >> > > >> > > > > > >>>>>>> for >>>> > > > >> > > >> > > > > > >>>>>>>>>> `localKeyBy()`. For example, will >>>> the >>>> > > > >> following >>>> > > > >> > > code >>>> > > > >> > > >> > share >>>> > > > >> > > >> > > > > > >>> the >>>> > > > >> > > >> > > > > > >>>>> same >>>> > > > >> > > >> > > > > > >>>>>>>>> result? >>>> > > > >> > > >> > > > > > >>>>>>>>>> and what're the differences >>>> between them? >>>> > > > >> > > >> > > > > > >>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>> 1. input.keyBy(0).sum(1) >>>> > > > >> > > >> > > > > > >>>>>>>>>> 2. >>>> > > > input.localKeyBy(0).sum(1).keyBy(0).sum(1) >>>> > > > >> > > >> > > > > > >>>>>>>>>> 3. >>>> > > > >> > > >> > > > > > >> >>>> > > > >> input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1) >>>> > > > >> > > >> > > > > > >>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>> Would also be great if we can add >>>> this >>>> > > into >>>> > > > >> the >>>> > > > >> > > >> > document. >>>> > > > >> > > >> > > > > > >>> Thank >>>> > > > >> > > >> > > > > > >>>>> you >>>> > > > >> > > >> > > > > > >>>>>>>> very >>>> > > > >> > > >> > > > > > >>>>>>>>>> much. >>>> > > > >> > > >> > > > > > >>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>> Best, Hequn >>>> > > > >> > > >> > > > > > >>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>> On Fri, Jun 14, 2019 at 11:34 AM >>>> vino >>>> > > yang < >>>> > > > >> > > >> > > > > > >>>>> yanghua1...@gmail.com <mailto: >>>> yanghua1...@gmail.com>> >>>> > > > >> > > >> > > > > > >>>>>>>>> wrote: >>>> > > > >> > > >> > > > > > >>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>>> Hi Aljoscha, >>>> > > > >> > > >> > > > > > >>>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>>> I have looked at the "*Process*" >>>> section >>>> > > of >>>> > > > >> FLIP >>>> > > > >> > > >> wiki >>>> > > > >> > > >> > > > > > >>>> page.[1] >>>> > > > >> > > >> > > > > > >>>>>> This >>>> > > > >> > > >> > > > > > >>>>>>>>> mail >>>> > > > >> > > >> > > > > > >>>>>>>>>>> thread indicates that it has >>>> proceeded to >>>> > > > the >>>> > > > >> > > third >>>> > > > >> > > >> > step. >>>> > > > >> > > >> > > > > > >>>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>>> When I looked at the fourth >>>> step(vote >>>> > > > step), >>>> > > > >> I >>>> > > > >> > > >> didn't >>>> > > > >> > > >> > > > > > >> find >>>> > > > >> > > >> > > > > > >>>> the >>>> > > > >> > > >> > > > > > >>>>>>>>>>> prerequisites for starting the >>>> voting >>>> > > > >> process. >>>> > > > >> > > >> > > > > > >>>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>>> Considering that the discussion >>>> of this >>>> > > > >> feature >>>> > > > >> > > has >>>> > > > >> > > >> > been >>>> > > > >> > > >> > > > > > >>> done >>>> > > > >> > > >> > > > > > >>>>> in >>>> > > > >> > > >> > > > > > >>>>>>> the >>>> > > > >> > > >> > > > > > >>>>>>>>> old >>>> > > > >> > > >> > > > > > >>>>>>>>>>> thread. [2] So can you tell me >>>> when >>>> > > should >>>> > > > I >>>> > > > >> > start >>>> > > > >> > > >> > > > > > >> voting? >>>> > > > >> > > >> > > > > > >>>> Can >>>> > > > >> > > >> > > > > > >>>>> I >>>> > > > >> > > >> > > > > > >>>>>>>> start >>>> > > > >> > > >> > > > > > >>>>>>>>>> now? >>>> > > > >> > > >> > > > > > >>>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>>> Best, >>>> > > > >> > > >> > > > > > >>>>>>>>>>> Vino >>>> > > > >> > > >> > > > > > >>>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>>> [1]: >>>> > > > >> > > >> > > > > > >>>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>> >>>> > > > >> > > >> > > > > > >>>>>> >>>> > > > >> > > >> > > > > > >>>>> >>>> > > > >> > > >> > > > > > >>>> >>>> > > > >> > > >> > > > > > >>> >>>> > > > >> > > >> > > > > > >> >>>> > > > >> > > >> > > > > > >>>> > > > >> > > >> > > > > >>>> > > > >> > > >> > > > >>>> > > > >> > > >> > > >>>> > > > >> > > >> > >>>> > > > >> > > >> >>>> > > > >> > > >>>> > > > >> > >>>> > > > >> >>>> > > > >>>> > > >>>> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals#FlinkImprovementProposals-FLIPround-up >>>> < >>>> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals#FlinkImprovementProposals-FLIPround-up >>>> > >>>> > > > >> > > >> > > > > > >>>>>>>>>>> [2]: >>>> > > > >> > > >> > > > > > >>>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>> >>>> > > > >> > > >> > > > > > >>>>>> >>>> > > > >> > > >> > > > > > >>>>> >>>> > > > >> > > >> > > > > > >>>> >>>> > > > >> > > >> > > > > > >>> >>>> > > > >> > > >> > > > > > >> >>>> > > > >> > > >> > > > > > >>>> > > > >> > > >> > > > > >>>> > > > >> > > >> > > > >>>> > > > >> > > >> > > >>>> > > > >> > > >> > >>>> > > > >> > > >> >>>> > > > >> > > >>>> > > > >> > >>>> > > > >> >>>> > > > >>>> > > >>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308 >>>> < >>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308 >>>> > >>>> > > > >> > > >> > > > > > >>>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>>> leesf <leesf0...@gmail.com >>>> <mailto:leesf0...@gmail.com>> >>>> > > 于2019年6月13日周四 >>>> > > > >> > > 上午9:19写道: >>>> > > > >> > > >> > > > > > >>>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>>>> +1 for the FLIP, thank vino for >>>> your >>>> > > > >> efforts. >>>> > > > >> > > >> > > > > > >>>>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>>>> Best, >>>> > > > >> > > >> > > > > > >>>>>>>>>>>> Leesf >>>> > > > >> > > >> > > > > > >>>>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>>>> vino yang <yanghua1...@gmail.com >>>> <mailto:yanghua1...@gmail.com>> >>>> > > > >> > 于2019年6月12日周三 >>>> > > > >> > > >> > > > > > >>> 下午5:46写道: >>>> > > > >> > > >> > > > > > >>>>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> Hi folks, >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> I would like to start the FLIP >>>> > > discussion >>>> > > > >> > thread >>>> > > > >> > > >> > > > > > >> about >>>> > > > >> > > >> > > > > > >>>>>>> supporting >>>> > > > >> > > >> > > > > > >>>>>>>>>> local >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> aggregation in Flink. >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> In short, this feature can >>>> effectively >>>> > > > >> > alleviate >>>> > > > >> > > >> data >>>> > > > >> > > >> > > > > > >>>> skew. >>>> > > > >> > > >> > > > > > >>>>>>> This >>>> > > > >> > > >> > > > > > >>>>>>>> is >>>> > > > >> > > >> > > > > > >>>>>>>>>> the >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> FLIP: >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>> >>>> > > > >> > > >> > > > > > >>>>>> >>>> > > > >> > > >> > > > > > >>>>> >>>> > > > >> > > >> > > > > > >>>> >>>> > > > >> > > >> > > > > > >>> >>>> > > > >> > > >> > > > > > >> >>>> > > > >> > > >> > > > > > >>>> > > > >> > > >> > > > > >>>> > > > >> > > >> > > > >>>> > > > >> > > >> > > >>>> > > > >> > > >> > >>>> > > > >> > > >> >>>> > > > >> > > >>>> > > > >> > >>>> > > > >> >>>> > > > >>>> > > >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-44%3A+Support+Local+Aggregation+in+Flink >>>> < >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-44%3A+Support+Local+Aggregation+in+Flink >>>> > >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> *Motivation* (copied from FLIP) >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> Currently, keyed streams are >>>> widely >>>> > > used >>>> > > > to >>>> > > > >> > > >> perform >>>> > > > >> > > >> > > > > > >>>>>> aggregating >>>> > > > >> > > >> > > > > > >>>>>>>>>>>> operations >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> (e.g., reduce, sum and window) >>>> on the >>>> > > > >> elements >>>> > > > >> > > >> that >>>> > > > >> > > >> > > > > > >>> have >>>> > > > >> > > >> > > > > > >>>>> the >>>> > > > >> > > >> > > > > > >>>>>>> same >>>> > > > >> > > >> > > > > > >>>>>>>>>> key. >>>> > > > >> > > >> > > > > > >>>>>>>>>>>> When >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> executed at runtime, the >>>> elements with >>>> > > > the >>>> > > > >> > same >>>> > > > >> > > >> key >>>> > > > >> > > >> > > > > > >>> will >>>> > > > >> > > >> > > > > > >>>> be >>>> > > > >> > > >> > > > > > >>>>>>> sent >>>> > > > >> > > >> > > > > > >>>>>>>> to >>>> > > > >> > > >> > > > > > >>>>>>>>>> and >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> aggregated by the same task. >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> The performance of these >>>> aggregating >>>> > > > >> > operations >>>> > > > >> > > is >>>> > > > >> > > >> > > > > > >> very >>>> > > > >> > > >> > > > > > >>>>>>> sensitive >>>> > > > >> > > >> > > > > > >>>>>>>>> to >>>> > > > >> > > >> > > > > > >>>>>>>>>>> the >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> distribution of keys. In the >>>> cases >>>> > > where >>>> > > > >> the >>>> > > > >> > > >> > > > > > >>> distribution >>>> > > > >> > > >> > > > > > >>>>> of >>>> > > > >> > > >> > > > > > >>>>>>> keys >>>> > > > >> > > >> > > > > > >>>>>>>>>>>> follows a >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> powerful law, the performance >>>> will be >>>> > > > >> > > >> significantly >>>> > > > >> > > >> > > > > > >>>>>> downgraded. >>>> > > > >> > > >> > > > > > >>>>>>>>> More >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> unluckily, increasing the >>>> degree of >>>> > > > >> > parallelism >>>> > > > >> > > >> does >>>> > > > >> > > >> > > > > > >>> not >>>> > > > >> > > >> > > > > > >>>>> help >>>> > > > >> > > >> > > > > > >>>>>>>> when >>>> > > > >> > > >> > > > > > >>>>>>>>> a >>>> > > > >> > > >> > > > > > >>>>>>>>>>> task >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> is overloaded by a single key. >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> Local aggregation is a >>>> widely-adopted >>>> > > > >> method >>>> > > > >> > to >>>> > > > >> > > >> > > > > > >> reduce >>>> > > > >> > > >> > > > > > >>>> the >>>> > > > >> > > >> > > > > > >>>>>>>>>> performance >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> degraded by data skew. We can >>>> decompose >>>> > > > the >>>> > > > >> > > >> > > > > > >> aggregating >>>> > > > >> > > >> > > > > > >>>>>>>> operations >>>> > > > >> > > >> > > > > > >>>>>>>>>> into >>>> > > > >> > > >> > > > > > >>>>>>>>>>>> two >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> phases. In the first phase, we >>>> > > aggregate >>>> > > > >> the >>>> > > > >> > > >> elements >>>> > > > >> > > >> > > > > > >>> of >>>> > > > >> > > >> > > > > > >>>>> the >>>> > > > >> > > >> > > > > > >>>>>>> same >>>> > > > >> > > >> > > > > > >>>>>>>>> key >>>> > > > >> > > >> > > > > > >>>>>>>>>>> at >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> the sender side to obtain >>>> partial >>>> > > > results. >>>> > > > >> > Then >>>> > > > >> > > at >>>> > > > >> > > >> > > > > > >> the >>>> > > > >> > > >> > > > > > >>>>> second >>>> > > > >> > > >> > > > > > >>>>>>>>> phase, >>>> > > > >> > > >> > > > > > >>>>>>>>>>>> these >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> partial results are sent to >>>> receivers >>>> > > > >> > according >>>> > > > >> > > to >>>> > > > >> > > >> > > > > > >>> their >>>> > > > >> > > >> > > > > > >>>>> keys >>>> > > > >> > > >> > > > > > >>>>>>> and >>>> > > > >> > > >> > > > > > >>>>>>>>> are >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> combined to obtain the final >>>> result. >>>> > > > Since >>>> > > > >> the >>>> > > > >> > > >> number >>>> > > > >> > > >> > > > > > >>> of >>>> > > > >> > > >> > > > > > >>>>>>> partial >>>> > > > >> > > >> > > > > > >>>>>>>>>>> results >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> received by each receiver is >>>> limited by >>>> > > > the >>>> > > > >> > > >> number of >>>> > > > >> > > >> > > > > > >>>>>> senders, >>>> > > > >> > > >> > > > > > >>>>>>>> the >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> imbalance among receivers can be >>>> > > reduced. >>>> > > > >> > > >> Besides, by >>>> > > > >> > > >> > > > > > >>>>>> reducing >>>> > > > >> > > >> > > > > > >>>>>>>> the >>>> > > > >> > > >> > > > > > >>>>>>>>>>> amount >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> of transferred data the >>>> performance can >>>> > > > be >>>> > > > >> > > further >>>> > > > >> > > >> > > > > > >>>>> improved. >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> *More details*: >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> Design documentation: >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>> >>>> > > > >> > > >> > > > > > >>>>>> >>>> > > > >> > > >> > > > > > >>>>> >>>> > > > >> > > >> > > > > > >>>> >>>> > > > >> > > >> > > > > > >>> >>>> > > > >> > > >> > > > > > >> >>>> > > > >> > > >> > > > > > >>>> > > > >> > > >> > > > > >>>> > > > >> > > >> > > > >>>> > > > >> > > >> > > >>>> > > > >> > > >> > >>>> > > > >> > > >> >>>> > > > >> > > >>>> > > > >> > >>>> > > > >> >>>> > > > >>>> > > >>>> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing >>>> < >>>> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing >>>> > >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> Old discussion thread: >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>> >>>> > > > >> > > >> > > > > > >>>>>> >>>> > > > >> > > >> > > > > > >>>>> >>>> > > > >> > > >> > > > > > >>>> >>>> > > > >> > > >> > > > > > >>> >>>> > > > >> > > >> > > > > > >> >>>> > > > >> > > >> > > > > > >>>> > > > >> > > >> > > > > >>>> > > > >> > > >> > > > >>>> > > > >> > > >> > > >>>> > > > >> > > >> > >>>> > > > >> > > >> >>>> > > > >> > > >>>> > > > >> > >>>> > > > >> >>>> > > > >>>> > > >>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308 >>>> < >>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308 >>>> > >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> JIRA: FLINK-12786 < >>>> > > > >> > > >> > > > > > >>>>>>>>> >>>> > > > >> https://issues.apache.org/jira/browse/FLINK-12786 < >>>> https://issues.apache.org/jira/browse/FLINK-12786> >>>> > > > >> > > >> > > > > > >>>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> We are looking forwards to your >>>> > > feedback! >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> Best, >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> Vino >>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>>> >>>> > > > >> > > >> > > > > > >>>>>>> >>>> > > > >> > > >> > > > > > >>>>>> >>>> > > > >> > > >> > > > > > >>>>> >>>> > > > >> > > >> > > > > > >>>> >>>> > > > >> > > >> > > > > > >>> >>>> > > > >> > > >> > > > > > >> >>>> > > > >> > > >> > > > > > >>>> > > > >> > > >> > > > > > >>>> > > > >> > > >> > > > > >>>> > > > >> > > >> > > > >>>> > > > >> > > >> > > >>>> > > > >> > > >> > >>>> > > > >> > > >> >>>> > > > >> > > > >>>> > > > >> > > >>>> > > > >> > >>>> > > > >> >>>> > > > > >>>> > > > >>>> > > >>>> >>>>