Hi Vino,

So the difference between `DataStream.localKeyBy().process()` with
`DataStream.process()` is that the former can access keyed state and the
latter can only access operator state.
I think it's out of the scope of designing a local aggregation API. It
might be an extension of state API, i.e. local keyed state.
The difference between local keyed state with operator state (if I
understand correctly) is local keyed state can be backed on RocksDB? or
making "keyed state" locally?
IMO, it's a larger topic than local aggregation and should be discussed
separately. I cc-ed people who works on states @Tzu-Li (Gordon) Tai
<tzuli...@apache.org>  @Seth @Yu Li to give some feedback from the
perspective of state.

Regarding to the API designing updated in your FLIP, I have some concerns:

1) The "localKeyBy()" method returns a "KeyedStream" which exposes all
method of it.
However, not every method makes sense or have a clear definition on local
stream.
For example, "countWindow(long, long)", "timeWindow(long, long)",
"window(WindowAssigner)", and "intervalJoin" Hequn mentioned before.
I would suggest we can expose the only APIs we needed for local aggregation
and leave the others later.
We can return a "LocalKeyedStream" and may expose only some dedicated
methods: for example, "aggregate()", "trigger()".
These APIs do not need to expose local keyed state to support local
aggregation.

2) I think `localKeyBy().process()` is something called "local process",
not just "local aggregate".
It needs more discussion about local keyed state, and I would like to put
it out of this FLIP.


Regards,
Jark


On Thu, 27 Jun 2019 at 13:03, vino yang <yanghua1...@gmail.com> wrote:

> Hi all,
>
> I also think it's a good idea that we need to agree on the API level first.
>
> I am sorry, we did not give some usage examples of the API in the FLIP
> documentation before. This may have caused some misunderstandings about the
> discussion of this mail thread.
>
> So, now I have added some usage examples in the "Public Interfaces"
> section of the FLIP-44 documentation.
>
> Let us first know the API through its use examples.
>
> Any feedback and questions please let me know.
>
> Best,
> Vino
>
> vino yang <yanghua1...@gmail.com> 于2019年6月27日周四 下午12:51写道:
>
>> Hi Jark,
>>
>> `DataStream.localKeyBy().process()` has some key difference with
>> `DataStream.process()`. The former API receive `KeyedProcessFunction`
>> (sorry my previous reply may let you misunderstood), the latter receive API
>> receive `ProcessFunction`. When you read the java doc of ProcessFunction,
>> you can find a "*Note*" statement:
>>
>> Access to keyed state and timers (which are also scoped to a key) is only
>>> available if the ProcessFunction is applied on a KeyedStream.
>>
>>
>> In addition, you can also compare the two
>> implementations(`ProcessOperator` and `KeyedProcessOperator`) of them to
>> view the difference.
>>
>> IMO, the "Note" statement means a lot for many use scenarios.
>> For example, if we cannot access keyed state, we can only use heap memory
>> to buffer data while it does not guarantee the semantics of correctness!
>> And the timer is also very important in some scenarios.
>>
>> That's why we say our API is flexible, it can get most benefits (even
>> subsequent potential benefits in the future) from KeyedStream.
>>
>> I have added some instructions on the use of localKeyBy in the FLIP-44
>> documentation.
>>
>> Best,
>> Vino
>>
>>
>> Jark Wu <imj...@gmail.com> 于2019年6月27日周四 上午10:44写道:
>>
>>> Hi Piotr,
>>>
>>> I think the state migration you raised is a good point. Having
>>> "stream.enableLocalAggregation(Trigger)” might add some implicit operators
>>> which users can't set uid and cause the state compatibility/evolution
>>> problems.
>>> So let's put this in rejected alternatives.
>>>
>>> Hi Vino,
>>>
>>> You mentioned several times that "DataStream.localKeyBy().process()" can
>>> solve the data skew problem of "DataStream.keyBy().process()".
>>> I'm curious about what's the differences between "DataStream.process()"
>>> and "DataStream.localKeyBy().process()"?
>>> Can't "DataStream.process()" solve the data skew problem?
>>>
>>> Best,
>>> Jark
>>>
>>>
>>> On Wed, 26 Jun 2019 at 18:20, Piotr Nowojski <pi...@ververica.com>
>>> wrote:
>>>
>>>> Hi Jark and Vino,
>>>>
>>>> I agree fully with Jark, that in order to have the discussion focused
>>>> and to limit the number of parallel topics, we should first focus on one
>>>> topic. We can first decide on the API and later we can discuss the runtime
>>>> details. At least as long as we keep the potential requirements of the
>>>> runtime part in mind while designing the API.
>>>>
>>>> Regarding the automatic optimisation and proposed by Jark:
>>>>
>>>> "stream.enableLocalAggregation(Trigger)”
>>>>
>>>> I would be against that in the DataStream API for the reasons that Vino
>>>> presented. There was a discussion thread about future directions of Table
>>>> API vs DataStream API and the consensus was that the automatic
>>>> optimisations are one of the dividing lines between those two, for at least
>>>> a couple of reasons. Flexibility and full control over the program was one
>>>> of them. Another is state migration. Having
>>>> "stream.enableLocalAggregation(Trigger)” that might add some implicit
>>>> operators in the job graph can cause problems with savepoint/checkpoint
>>>> compatibility.
>>>>
>>>> However I haven’t thought about/looked into the details of the Vino’s
>>>> API proposal, so I can not fully judge it.
>>>>
>>>> Piotrek
>>>>
>>>> > On 26 Jun 2019, at 09:17, vino yang <yanghua1...@gmail.com> wrote:
>>>> >
>>>> > Hi Jark,
>>>> >
>>>> > Similar questions and responses have been repeated many times.
>>>> >
>>>> > Why didn't we spend more sections discussing the API?
>>>> >
>>>> > Because we try to reuse the ability of KeyedStream. The localKeyBy
>>>> API just returns the KeyedStream, that's our design, we can get all the
>>>> benefit from the KeyedStream and get further benefit from WindowedStream.
>>>> The APIs come from KeyedStream and WindowedStream is long-tested and
>>>> flexible. Yes, we spend much space discussing the local keyed state, that's
>>>> not the goal and motivation, that's the way to implement local aggregation.
>>>> It is much more complicated than the API we introduced, so we spent more
>>>> section. Of course, this is the implementation level of the Operator. We
>>>> also agreed to support the implementation of buffer+flush and added related
>>>> instructions to the documentation. This needs to wait for the community to
>>>> recognize, and if the community agrees, we will give more instructions.
>>>> What's more, I have indicated before that we welcome state-related
>>>> commenters to participate in the discussion, but it is not wise to modify
>>>> the FLIP title.
>>>> >
>>>> > About the API of local aggregation:
>>>> >
>>>> > I don't object to ease of use is very important. But IMHO flexibility
>>>> is the most important at the DataStream API level. Otherwise, what does
>>>> DataStream mean? The significance of the DataStream API is that it is more
>>>> flexible than Table/SQL, if it cannot provide this point then everyone
>>>> would just use Table/SQL.
>>>> >
>>>> > The DataStream API should focus more on flexibility than on automatic
>>>> optimization, which allows users to have more possibilities to implement
>>>> complex programs and meet specific scenarios. There are a lot of programs
>>>> written using the DataStream API that are far more complex than we think.
>>>> It is very difficult to optimize at the API level and the benefit is very
>>>> low.
>>>> >
>>>> > I want to say that we support a more generalized local aggregation. I
>>>> mentioned in the previous reply that not only the UDF that implements
>>>> AggregateFunction is called aggregation. In some complex scenarios, we have
>>>> to support local aggregation through ProcessFunction and
>>>> ProcessWindowFunction to solve the data skew problem. How do you support
>>>> them in the API implementation and optimization you mentioned?
>>>> >
>>>> > Flexible APIs are arbitrarily combined to result in erroneous
>>>> semantics, which does not prove that flexibility is meaningless because the
>>>> user is the decision maker. I have been exemplified many times, for many
>>>> APIs in DataStream, if we arbitrarily combined them, they also do not have
>>>> much practical significance. So, users who use flexible APIs need to
>>>> understand what they are doing and what is the right choice.
>>>> >
>>>> > I think that if we discuss this, there will be no result.
>>>> >
>>>> > @Stephan Ewen <mailto:se...@apache.org> , @Aljoscha Krettek <mailto:
>>>> aljos...@apache.org> and @Piotr Nowojski <mailto:pi...@ververica.com>
>>>> Do you have further comments?
>>>> >
>>>> >
>>>> > Jark Wu <imj...@gmail.com <mailto:imj...@gmail.com>> 于2019年6月26日周三
>>>> 上午11:46写道:
>>>> > Thanks for the long discussion Vino, Kurt, Hequn, Piotr and others,
>>>> >
>>>> > It seems that we still have some different ideas about the API
>>>> > (localKeyBy()?) and implementation details (reuse window operator?
>>>> local
>>>> > keyed state?).
>>>> > And the discussion is stalled and mixed with motivation and API and
>>>> > implementation discussion.
>>>> >
>>>> > In order to make some progress in this topic, I want to summarize the
>>>> > points (pls correct me if I'm wrong or missing sth) and would suggest
>>>> to
>>>> > split
>>>> >  the topic into following aspects and discuss them one by one.
>>>> >
>>>> > 1) What's the main purpose of this FLIP?
>>>> >  - From the title of this FLIP, it is to support local aggregate.
>>>> However
>>>> > from the content of the FLIP, 80% are introducing a new state called
>>>> local
>>>> > keyed state.
>>>> >  - If we mainly want to introduce local keyed state, then we should
>>>> > re-title the FLIP and involve in more people who works on state.
>>>> >  - If we mainly want to support local aggregate, then we can jump to
>>>> step 2
>>>> > to discuss the API design.
>>>> >
>>>> > 2) What does the API look like?
>>>> >  - Vino proposed to use "localKeyBy()" to do local process, the
>>>> output of
>>>> > local process is the result type of aggregate function.
>>>> >   a) For non-windowed aggregate:
>>>> > input.localKeyBy(..).aggregate(agg1).keyBy(..).aggregate(agg2)
>>>> **NOT
>>>> > SUPPORT**
>>>> >   b) For windowed aggregate:
>>>> >
>>>> input.localKeyBy(..).window(w1).aggregate(agg1).keyBy(..).window(w2).aggregate(agg2)
>>>> >
>>>> > 3) What's the implementation detail?
>>>> >  - may reuse window operator or not.
>>>> >  - may introduce a new state concepts or not.
>>>> >  - may not have state in local operator by flushing buffers in
>>>> > prepareSnapshotPreBarrier
>>>> >  - and so on...
>>>> >  - we can discuss these later when we reach a consensus on API
>>>> >
>>>> > --------------------
>>>> >
>>>> > Here are my thoughts:
>>>> >
>>>> > 1) Purpose of this FLIP
>>>> >  - From the motivation section in the FLIP, I think the purpose is to
>>>> > support local aggregation to solve the data skew issue.
>>>> >    Then I think we should focus on how to provide a easy to use and
>>>> clear
>>>> > API to support **local aggregation**.
>>>> >  - Vino's point is centered around the local keyed state API (or
>>>> > localKeyBy()), and how to leverage the local keyed state API to
>>>> support
>>>> > local aggregation.
>>>> >    But I'm afraid it's not a good way to design API for local
>>>> aggregation.
>>>> >
>>>> > 2) local aggregation API
>>>> >  - IMO, the method call chain
>>>> >
>>>> "input.localKeyBy(..).window(w1).aggregate(agg1).keyBy(..).window(w2).aggregate(agg2)"
>>>> > is not such easy to use.
>>>> >    Because we have to provide two implementation for an aggregation
>>>> (one
>>>> > for partial agg, another for final agg). And we have to take care of
>>>> >    the first window call, an inappropriate window call will break the
>>>> > sematics.
>>>> >  - From my point of view, local aggregation is a mature concept which
>>>> > should output the intermediate accumulator (ACC) in the past period
>>>> of time
>>>> > (a trigger).
>>>> >    And the downstream final aggregation will merge ACCs received from
>>>> local
>>>> > side, and output the current final result.
>>>> >  - The current "AggregateFunction" API in DataStream already has the
>>>> > accumulator type and "merge" method. So the only thing user need to
>>>> do is
>>>> > how to enable
>>>> >    local aggregation opimization and set a trigger.
>>>> >  - One idea comes to my head is that, assume we have a windowed
>>>> aggregation
>>>> > stream: "val stream = input.keyBy().window(w).aggregate(agg)". We can
>>>> > provide an API on the stream.
>>>> >    For exmaple, "stream.enableLocalAggregation(Trigger)", the trigger
>>>> can
>>>> > be "ContinuousEventTimeTrigger.of(Time.of(Time.minutes(1)))". Then it
>>>> will
>>>> > be optmized into
>>>> >    local operator + final operator, and local operator will combine
>>>> records
>>>> > every minute on event time.
>>>> >  - In this way, there is only one line added, and the output is the
>>>> same
>>>> > with before, because it is just an opimization.
>>>> >
>>>> >
>>>> > Regards,
>>>> > Jark
>>>> >
>>>> >
>>>> >
>>>> > On Tue, 25 Jun 2019 at 14:34, vino yang <yanghua1...@gmail.com
>>>> <mailto:yanghua1...@gmail.com>> wrote:
>>>> >
>>>> > > Hi Kurt,
>>>> > >
>>>> > > Answer your questions:
>>>> > >
>>>> > > a) Sorry, I just updated the Google doc, still have no time update
>>>> the
>>>> > > FLIP, will update FLIP as soon as possible.
>>>> > > About your description at this point, I have a question, what does
>>>> it mean:
>>>> > > how do we combine with
>>>> > > `AggregateFunction`?
>>>> > >
>>>> > > I have shown you the examples which Flink has supported:
>>>> > >
>>>> > >    - input.localKeyBy(0).aggregate()
>>>> > >    - input.localKeyBy(0).window().aggregate()
>>>> > >
>>>> > > You can show me a example about how do we combine with
>>>> `AggregateFuncion`
>>>> > > through your localAggregate API.
>>>> > >
>>>> > > About the example, how to do the local aggregation for AVG,
>>>> consider this
>>>> > > code:
>>>> > >
>>>> > >
>>>> > >
>>>> > >
>>>> > >
>>>> > >
>>>> > >
>>>> > >
>>>> > >
>>>> > > *DataStream<Tuple2<String, Long>> source = null; source
>>>> .localKeyBy(0)
>>>> > > .timeWindow(Time.seconds(60)) .aggregate(agg1, new
>>>> > > WindowFunction<Tuple2<Long, Long>, Tuple3<String, Long, Long>,
>>>> String,
>>>> > > TimeWindow>() {}) .keyBy(0) .timeWindow(Time.seconds(60))
>>>> .aggregate(agg2,
>>>> > > new WindowFunction<Tuple2<Long, Long>, Tuple2<String, Long>, String,
>>>> > > TimeWindow>());*
>>>> > >
>>>> > > *agg1:*
>>>> > > *signature : new AggregateFunction<Tuple2<String, Long>,
>>>> Tuple2<Long,
>>>> > > Long>, Tuple2<Long, Long>>() {}*
>>>> > > *input param type: Tuple2<String, Long> f0: key, f1: value*
>>>> > > *intermediate result type: Tuple2<Long, Long>, f0: local aggregated
>>>> sum;
>>>> > > f1: local aggregated count*
>>>> > > *output param type:  Tuple2<Long, Long>, f0: local aggregated sum;
>>>> f1:
>>>> > > local aggregated count*
>>>> > >
>>>> > > *agg2:*
>>>> > > *signature: new AggregateFunction<Tuple3<String, Long, Long>, Long,
>>>> > > Tuple2<String, Long>>() {},*
>>>> > > *input param type: Tuple3<String, Long, Long>, f0: key, f1:  local
>>>> > > aggregated sum; f2: local aggregated count*
>>>> > >
>>>> > > *intermediate result type: Long  avg result*
>>>> > > *output param type:  Tuple2<String, Long> f0: key, f1 avg result*
>>>> > >
>>>> > > For sliding window, we just need to change the window type if users
>>>> want to
>>>> > > do.
>>>> > > Again, we try to give the design and implementation in the
>>>> DataStream
>>>> > > level. So I believe we can match all the requirements(It's just
>>>> that the
>>>> > > implementation may be different) comes from the SQL level.
>>>> > >
>>>> > > b) Yes, Theoretically, your thought is right. But in reality, it
>>>> cannot
>>>> > > bring many benefits.
>>>> > > If we want to get the benefits from the window API, while we do not
>>>> reuse
>>>> > > the window operator? And just copy some many duplicated code to
>>>> another
>>>> > > operator?
>>>> > >
>>>> > > c) OK, I agree to let the state backend committers join this
>>>> discussion.
>>>> > >
>>>> > > Best,
>>>> > > Vino
>>>> > >
>>>> > >
>>>> > > Kurt Young <ykt...@gmail.com <mailto:ykt...@gmail.com>>
>>>> 于2019年6月24日周一 下午6:53写道:
>>>> > >
>>>> > > > Hi vino,
>>>> > > >
>>>> > > > One thing to add,  for a), I think use one or two examples like
>>>> how to do
>>>> > > > local aggregation on a sliding window,
>>>> > > > and how do we do local aggregation on an unbounded aggregate,
>>>> will do a
>>>> > > lot
>>>> > > > help.
>>>> > > >
>>>> > > > Best,
>>>> > > > Kurt
>>>> > > >
>>>> > > >
>>>> > > > On Mon, Jun 24, 2019 at 6:06 PM Kurt Young <ykt...@gmail.com
>>>> <mailto:ykt...@gmail.com>> wrote:
>>>> > > >
>>>> > > > > Hi vino,
>>>> > > > >
>>>> > > > > I think there are several things still need discussion.
>>>> > > > >
>>>> > > > > a) We all agree that we should first go with a unified
>>>> abstraction, but
>>>> > > > > the abstraction is not reflected by the FLIP.
>>>> > > > > If your answer is "locakKeyBy" API, then I would ask how do we
>>>> combine
>>>> > > > > with `AggregateFunction`, and how do
>>>> > > > > we do proper local aggregation for those have different
>>>> intermediate
>>>> > > > > result type, like AVG. Could you add these
>>>> > > > > to the document?
>>>> > > > >
>>>> > > > > b) From implementation side, reusing window operator is one of
>>>> the
>>>> > > > > possible solutions, but not we base on window
>>>> > > > > operator to have two different implementations. What I
>>>> understanding
>>>> > > is,
>>>> > > > > one of the possible implementations should
>>>> > > > > not touch window operator.
>>>> > > > >
>>>> > > > > c) 80% of your FLIP content is actually describing how do we
>>>> support
>>>> > > > local
>>>> > > > > keyed state. I don't know if this is necessary
>>>> > > > > to introduce at the first step and we should also involve
>>>> committers
>>>> > > work
>>>> > > > > on state backend to share their thoughts.
>>>> > > > >
>>>> > > > > Best,
>>>> > > > > Kurt
>>>> > > > >
>>>> > > > >
>>>> > > > > On Mon, Jun 24, 2019 at 5:17 PM vino yang <
>>>> yanghua1...@gmail.com <mailto:yanghua1...@gmail.com>>
>>>> > > wrote:
>>>> > > > >
>>>> > > > >> Hi Kurt,
>>>> > > > >>
>>>> > > > >> You did not give more further different opinions, so I thought
>>>> you
>>>> > > have
>>>> > > > >> agreed with the design after we promised to support two kinds
>>>> of
>>>> > > > >> implementation.
>>>> > > > >>
>>>> > > > >> In API level, we have answered your question about pass an
>>>> > > > >> AggregateFunction to do the aggregation. No matter introduce
>>>> > > localKeyBy
>>>> > > > >> API
>>>> > > > >> or not, we can support AggregateFunction.
>>>> > > > >>
>>>> > > > >> So what's your different opinion now? Can you share it with us?
>>>> > > > >>
>>>> > > > >> Best,
>>>> > > > >> Vino
>>>> > > > >>
>>>> > > > >> Kurt Young <ykt...@gmail.com <mailto:ykt...@gmail.com>>
>>>> 于2019年6月24日周一 下午4:24写道:
>>>> > > > >>
>>>> > > > >> > Hi vino,
>>>> > > > >> >
>>>> > > > >> > Sorry I don't see the consensus about reusing window
>>>> operator and
>>>> > > keep
>>>> > > > >> the
>>>> > > > >> > API design of localKeyBy. But I think we should definitely
>>>> more
>>>> > > > thoughts
>>>> > > > >> > about this topic.
>>>> > > > >> >
>>>> > > > >> > I also try to loop in Stephan for this discussion.
>>>> > > > >> >
>>>> > > > >> > Best,
>>>> > > > >> > Kurt
>>>> > > > >> >
>>>> > > > >> >
>>>> > > > >> > On Mon, Jun 24, 2019 at 3:26 PM vino yang <
>>>> yanghua1...@gmail.com <mailto:yanghua1...@gmail.com>>
>>>> > > > >> wrote:
>>>> > > > >> >
>>>> > > > >> > > Hi all,
>>>> > > > >> > >
>>>> > > > >> > > I am happy we have a wonderful discussion and received many
>>>> > > valuable
>>>> > > > >> > > opinions in the last few days.
>>>> > > > >> > >
>>>> > > > >> > > Now, let me try to summarize what we have reached
>>>> consensus about
>>>> > > > the
>>>> > > > >> > > changes in the design.
>>>> > > > >> > >
>>>> > > > >> > >    - provide a unified abstraction to support two kinds of
>>>> > > > >> > implementation;
>>>> > > > >> > >    - reuse WindowOperator and try to enhance it so that we
>>>> can
>>>> > > make
>>>> > > > >> the
>>>> > > > >> > >    intermediate result of the local aggregation can be
>>>> buffered
>>>> > > and
>>>> > > > >> > > flushed to
>>>> > > > >> > >    support two kinds of implementation;
>>>> > > > >> > >    - keep the API design of localKeyBy, but declare the
>>>> disabled
>>>> > > > some
>>>> > > > >> > APIs
>>>> > > > >> > >    we cannot support currently, and provide a configurable
>>>> API for
>>>> > > > >> users
>>>> > > > >> > to
>>>> > > > >> > >    choose how to handle intermediate result;
>>>> > > > >> > >
>>>> > > > >> > > The above three points have been updated in the design
>>>> doc. Any
>>>> > > > >> > > questions, please let me know.
>>>> > > > >> > >
>>>> > > > >> > > @Aljoscha Krettek <aljos...@apache.org <mailto:
>>>> aljos...@apache.org>> What do you think? Any
>>>> > > > >> further
>>>> > > > >> > > comments?
>>>> > > > >> > >
>>>> > > > >> > > Best,
>>>> > > > >> > > Vino
>>>> > > > >> > >
>>>> > > > >> > > vino yang <yanghua1...@gmail.com <mailto:
>>>> yanghua1...@gmail.com>> 于2019年6月20日周四 下午2:02写道:
>>>> > > > >> > >
>>>> > > > >> > > > Hi Kurt,
>>>> > > > >> > > >
>>>> > > > >> > > > Thanks for your comments.
>>>> > > > >> > > >
>>>> > > > >> > > > It seems we come to a consensus that we should alleviate
>>>> the
>>>> > > > >> > performance
>>>> > > > >> > > > degraded by data skew with local aggregation. In this
>>>> FLIP, our
>>>> > > > key
>>>> > > > >> > > > solution is to introduce local keyed partition to
>>>> achieve this
>>>> > > > goal.
>>>> > > > >> > > >
>>>> > > > >> > > > I also agree that we can benefit a lot from the usage of
>>>> > > > >> > > > AggregateFunction. In combination with localKeyBy, We
>>>> can easily
>>>> > > > >> use it
>>>> > > > >> > > to
>>>> > > > >> > > > achieve local aggregation:
>>>> > > > >> > > >
>>>> > > > >> > > >    - input.localKeyBy(0).aggregate()
>>>> > > > >> > > >    - input.localKeyBy(0).window().aggregate()
>>>> > > > >> > > >
>>>> > > > >> > > >
>>>> > > > >> > > > I think the only problem here is the choices between
>>>> > > > >> > > >
>>>> > > > >> > > >    - (1) Introducing a new primitive called localKeyBy
>>>> and
>>>> > > > implement
>>>> > > > >> > > >    local aggregation with existing operators, or
>>>> > > > >> > > >    - (2) Introducing an operator called localAggregation
>>>> which
>>>> > > is
>>>> > > > >> > > >    composed of a key selector, a window-like operator,
>>>> and an
>>>> > > > >> aggregate
>>>> > > > >> > > >    function.
>>>> > > > >> > > >
>>>> > > > >> > > >
>>>> > > > >> > > > There may exist some optimization opportunities by
>>>> providing a
>>>> > > > >> > composited
>>>> > > > >> > > > interface for local aggregation. But at the same time,
>>>> in my
>>>> > > > >> opinion,
>>>> > > > >> > we
>>>> > > > >> > > > lose flexibility (Or we need certain efforts to achieve
>>>> the same
>>>> > > > >> > > > flexibility).
>>>> > > > >> > > >
>>>> > > > >> > > > As said in the previous mails, we have many use cases
>>>> where the
>>>> > > > >> > > > aggregation is very complicated and cannot be performed
>>>> with
>>>> > > > >> > > > AggregateFunction. For example, users may perform
>>>> windowed
>>>> > > > >> aggregations
>>>> > > > >> > > > according to time, data values, or even external storage.
>>>> > > > Typically,
>>>> > > > >> > they
>>>> > > > >> > > > now use KeyedProcessFunction or customized triggers to
>>>> implement
>>>> > > > >> these
>>>> > > > >> > > > aggregations. It's not easy to address data skew in such
>>>> cases
>>>> > > > with
>>>> > > > >> a
>>>> > > > >> > > > composited interface for local aggregation.
>>>> > > > >> > > >
>>>> > > > >> > > > Given that Data Stream API is exactly targeted at these
>>>> cases
>>>> > > > where
>>>> > > > >> the
>>>> > > > >> > > > application logic is very complicated and optimization
>>>> does not
>>>> > > > >> > matter, I
>>>> > > > >> > > > think it's a better choice to provide a relatively
>>>> low-level and
>>>> > > > >> > > canonical
>>>> > > > >> > > > interface.
>>>> > > > >> > > >
>>>> > > > >> > > > The composited interface, on the other side, may be a
>>>> good
>>>> > > choice
>>>> > > > in
>>>> > > > >> > > > declarative interfaces, including SQL and Table API, as
>>>> it
>>>> > > allows
>>>> > > > >> more
>>>> > > > >> > > > optimization opportunities.
>>>> > > > >> > > >
>>>> > > > >> > > > Best,
>>>> > > > >> > > > Vino
>>>> > > > >> > > >
>>>> > > > >> > > >
>>>> > > > >> > > > Kurt Young <ykt...@gmail.com <mailto:ykt...@gmail.com>>
>>>> 于2019年6月20日周四 上午10:15写道:
>>>> > > > >> > > >
>>>> > > > >> > > >> Hi all,
>>>> > > > >> > > >>
>>>> > > > >> > > >> As vino said in previous emails, I think we should first
>>>> > > discuss
>>>> > > > >> and
>>>> > > > >> > > >> decide
>>>> > > > >> > > >> what kind of use cases this FLIP want to
>>>> > > > >> > > >> resolve, and what the API should look like. From my
>>>> side, I
>>>> > > think
>>>> > > > >> this
>>>> > > > >> > > is
>>>> > > > >> > > >> probably the root cause of current divergence.
>>>> > > > >> > > >>
>>>> > > > >> > > >> My understand is (from the FLIP title and motivation
>>>> section of
>>>> > > > the
>>>> > > > >> > > >> document), we want to have a proper support of
>>>> > > > >> > > >> local aggregation, or pre aggregation. This is not a
>>>> very new
>>>> > > > idea,
>>>> > > > >> > most
>>>> > > > >> > > >> SQL engine already did this improvement. And
>>>> > > > >> > > >> the core concept about this is, there should be an
>>>> > > > >> AggregateFunction,
>>>> > > > >> > no
>>>> > > > >> > > >> matter it's a Flink runtime's AggregateFunction or
>>>> > > > >> > > >> SQL's UserDefinedAggregateFunction. Both aggregation
>>>> have
>>>> > > concept
>>>> > > > >> of
>>>> > > > >> > > >> intermediate data type, sometimes we call it ACC.
>>>> > > > >> > > >> I quickly went through the POC piotr did before [1], it
>>>> also
>>>> > > > >> directly
>>>> > > > >> > > uses
>>>> > > > >> > > >> AggregateFunction.
>>>> > > > >> > > >>
>>>> > > > >> > > >> But the thing is, after reading the design of this
>>>> FLIP, I
>>>> > > can't
>>>> > > > >> help
>>>> > > > >> > > >> myself feeling that this FLIP is not targeting to have
>>>> a proper
>>>> > > > >> > > >> local aggregation support. It actually want to introduce
>>>> > > another
>>>> > > > >> > > concept:
>>>> > > > >> > > >> LocalKeyBy, and how to split and merge local key groups,
>>>> > > > >> > > >> and how to properly support state on local key. Local
>>>> > > aggregation
>>>> > > > >> just
>>>> > > > >> > > >> happened to be one possible use case of LocalKeyBy.
>>>> > > > >> > > >> But it lacks supporting the essential concept of local
>>>> > > > aggregation,
>>>> > > > >> > > which
>>>> > > > >> > > >> is intermediate data type. Without this, I really don't
>>>> thing
>>>> > > > >> > > >> it is a good fit of local aggregation.
>>>> > > > >> > > >>
>>>> > > > >> > > >> Here I want to make sure of the scope or the goal about
>>>> this
>>>> > > > FLIP,
>>>> > > > >> do
>>>> > > > >> > we
>>>> > > > >> > > >> want to have a proper local aggregation engine, or we
>>>> > > > >> > > >> just want to introduce a new concept called LocalKeyBy?
>>>> > > > >> > > >>
>>>> > > > >> > > >> [1]: https://github.com/apache/flink/pull/4626 <
>>>> https://github.com/apache/flink/pull/4626>
>>>> > > > >> > > >>
>>>> > > > >> > > >> Best,
>>>> > > > >> > > >> Kurt
>>>> > > > >> > > >>
>>>> > > > >> > > >>
>>>> > > > >> > > >> On Wed, Jun 19, 2019 at 5:13 PM vino yang <
>>>> > > yanghua1...@gmail.com <mailto:yanghua1...@gmail.com>
>>>> > > > >
>>>> > > > >> > > wrote:
>>>> > > > >> > > >>
>>>> > > > >> > > >> > Hi Hequn,
>>>> > > > >> > > >> >
>>>> > > > >> > > >> > Thanks for your comments!
>>>> > > > >> > > >> >
>>>> > > > >> > > >> > I agree that allowing local aggregation reusing
>>>> window API
>>>> > > and
>>>> > > > >> > > refining
>>>> > > > >> > > >> > window operator to make it match both requirements
>>>> (come from
>>>> > > > our
>>>> > > > >> > and
>>>> > > > >> > > >> Kurt)
>>>> > > > >> > > >> > is a good decision!
>>>> > > > >> > > >> >
>>>> > > > >> > > >> > Concerning your questions:
>>>> > > > >> > > >> >
>>>> > > > >> > > >> > 1. The result of
>>>> input.localKeyBy(0).sum(1).keyBy(0).sum(1)
>>>> > > may
>>>> > > > >> be
>>>> > > > >> > > >> > meaningless.
>>>> > > > >> > > >> >
>>>> > > > >> > > >> > Yes, it does not make sense in most cases. However, I
>>>> also
>>>> > > want
>>>> > > > >> to
>>>> > > > >> > > note
>>>> > > > >> > > >> > users should know the right semantics of localKeyBy
>>>> and use
>>>> > > it
>>>> > > > >> > > >> correctly.
>>>> > > > >> > > >> > Because this issue also exists for the global keyBy,
>>>> consider
>>>> > > > >> this
>>>> > > > >> > > >> example:
>>>> > > > >> > > >> > input.keyBy(0).sum(1).keyBy(0).sum(1), the result is
>>>> also
>>>> > > > >> > meaningless.
>>>> > > > >> > > >> >
>>>> > > > >> > > >> > 2. About the semantics of
>>>> > > > >> > > >> >
>>>> input1.localKeyBy(0).intervalJoin(input2.localKeyBy(1)).
>>>> > > > >> > > >> >
>>>> > > > >> > > >> > Good catch! I agree with you that it's not good to
>>>> enable all
>>>> > > > >> > > >> > functionalities for localKeyBy from KeyedStream.
>>>> > > > >> > > >> > Currently, We do not support some APIs such as
>>>> > > > >> > > >> > connect/join/intervalJoin/coGroup. This is due to
>>>> that we
>>>> > > force
>>>> > > > >> the
>>>> > > > >> > > >> > operators on LocalKeyedStreams chained with the
>>>> inputs.
>>>> > > > >> > > >> >
>>>> > > > >> > > >> > Best,
>>>> > > > >> > > >> > Vino
>>>> > > > >> > > >> >
>>>> > > > >> > > >> >
>>>> > > > >> > > >> > Hequn Cheng <chenghe...@gmail.com <mailto:
>>>> chenghe...@gmail.com>> 于2019年6月19日周三 下午3:42写道:
>>>> > > > >> > > >> >
>>>> > > > >> > > >> > > Hi,
>>>> > > > >> > > >> > >
>>>> > > > >> > > >> > > Thanks a lot for your great discussion and great to
>>>> see
>>>> > > that
>>>> > > > >> some
>>>> > > > >> > > >> > agreement
>>>> > > > >> > > >> > > has been reached on the "local aggregate engine"!
>>>> > > > >> > > >> > >
>>>> > > > >> > > >> > > ===> Considering the abstract engine,
>>>> > > > >> > > >> > > I'm thinking is it valuable for us to extend the
>>>> current
>>>> > > > >> window to
>>>> > > > >> > > >> meet
>>>> > > > >> > > >> > > both demands raised by Kurt and Vino? There are some
>>>> > > benefits
>>>> > > > >> we
>>>> > > > >> > can
>>>> > > > >> > > >> get:
>>>> > > > >> > > >> > >
>>>> > > > >> > > >> > > 1. The interfaces of the window are complete and
>>>> clear.
>>>> > > With
>>>> > > > >> > > windows,
>>>> > > > >> > > >> we
>>>> > > > >> > > >> > > can define a lot of ways to split the data and
>>>> perform
>>>> > > > >> different
>>>> > > > >> > > >> > > computations.
>>>> > > > >> > > >> > > 2. We can also leverage the window to do miniBatch
>>>> for the
>>>> > > > >> global
>>>> > > > >> > > >> > > aggregation, i.e, we can use the window to bundle
>>>> data
>>>> > > belong
>>>> > > > >> to
>>>> > > > >> > the
>>>> > > > >> > > >> same
>>>> > > > >> > > >> > > key, for every bundle we only need to read and
>>>> write once
>>>> > > > >> state.
>>>> > > > >> > > This
>>>> > > > >> > > >> can
>>>> > > > >> > > >> > > greatly reduce state IO and improve performance.
>>>> > > > >> > > >> > > 3. A lot of other use cases can also benefit from
>>>> the
>>>> > > window
>>>> > > > >> base
>>>> > > > >> > on
>>>> > > > >> > > >> > memory
>>>> > > > >> > > >> > > or stateless.
>>>> > > > >> > > >> > >
>>>> > > > >> > > >> > > ===> As for the API,
>>>> > > > >> > > >> > > I think it is good to make our API more flexible.
>>>> However,
>>>> > > we
>>>> > > > >> may
>>>> > > > >> > > >> need to
>>>> > > > >> > > >> > > make our API meaningful.
>>>> > > > >> > > >> > >
>>>> > > > >> > > >> > > Take my previous reply as an example,
>>>> > > > >> > > >> > > input.localKeyBy(0).sum(1).keyBy(0).sum(1). The
>>>> result may
>>>> > > be
>>>> > > > >> > > >> > meaningless.
>>>> > > > >> > > >> > > Another example I find is the intervalJoin, e.g.,
>>>> > > > >> > > >> > >
>>>> input1.localKeyBy(0).intervalJoin(input2.localKeyBy(1)). In
>>>> > > > >> this
>>>> > > > >> > > >> case, it
>>>> > > > >> > > >> > > will bring problems if input1 and input2 share
>>>> different
>>>> > > > >> > > parallelism.
>>>> > > > >> > > >> We
>>>> > > > >> > > >> > > don't know which input should the join chained
>>>> with? Even
>>>> > > if
>>>> > > > >> they
>>>> > > > >> > > >> share
>>>> > > > >> > > >> > the
>>>> > > > >> > > >> > > same parallelism, it's hard to tell what the join
>>>> is doing.
>>>> > > > >> There
>>>> > > > >> > > are
>>>> > > > >> > > >> > maybe
>>>> > > > >> > > >> > > some other problems.
>>>> > > > >> > > >> > >
>>>> > > > >> > > >> > > From this point of view, it's at least not good to
>>>> enable
>>>> > > all
>>>> > > > >> > > >> > > functionalities for localKeyBy from KeyedStream?
>>>> > > > >> > > >> > >
>>>> > > > >> > > >> > > Great to also have your opinions.
>>>> > > > >> > > >> > >
>>>> > > > >> > > >> > > Best, Hequn
>>>> > > > >> > > >> > >
>>>> > > > >> > > >> > >
>>>> > > > >> > > >> > >
>>>> > > > >> > > >> > >
>>>> > > > >> > > >> > > On Wed, Jun 19, 2019 at 10:24 AM vino yang <
>>>> > > > >> yanghua1...@gmail.com <mailto:yanghua1...@gmail.com>
>>>> > > > >> > >
>>>> > > > >> > > >> > wrote:
>>>> > > > >> > > >> > >
>>>> > > > >> > > >> > > > Hi Kurt and Piotrek,
>>>> > > > >> > > >> > > >
>>>> > > > >> > > >> > > > Thanks for your comments.
>>>> > > > >> > > >> > > >
>>>> > > > >> > > >> > > > I agree that we can provide a better abstraction
>>>> to be
>>>> > > > >> > compatible
>>>> > > > >> > > >> with
>>>> > > > >> > > >> > > two
>>>> > > > >> > > >> > > > different implementations.
>>>> > > > >> > > >> > > >
>>>> > > > >> > > >> > > > First of all, I think we should consider what
>>>> kind of
>>>> > > > >> scenarios
>>>> > > > >> > we
>>>> > > > >> > > >> need
>>>> > > > >> > > >> > > to
>>>> > > > >> > > >> > > > support in *API* level?
>>>> > > > >> > > >> > > >
>>>> > > > >> > > >> > > > We have some use cases which need to a customized
>>>> > > > aggregation
>>>> > > > >> > > >> through
>>>> > > > >> > > >> > > > KeyedProcessFunction, (in the usage of our
>>>> > > > localKeyBy.window
>>>> > > > >> > they
>>>> > > > >> > > >> can
>>>> > > > >> > > >> > use
>>>> > > > >> > > >> > > > ProcessWindowFunction).
>>>> > > > >> > > >> > > >
>>>> > > > >> > > >> > > > Shall we support these flexible use scenarios?
>>>> > > > >> > > >> > > >
>>>> > > > >> > > >> > > > Best,
>>>> > > > >> > > >> > > > Vino
>>>> > > > >> > > >> > > >
>>>> > > > >> > > >> > > > Kurt Young <ykt...@gmail.com <mailto:
>>>> ykt...@gmail.com>> 于2019年6月18日周二 下午8:37写道:
>>>> > > > >> > > >> > > >
>>>> > > > >> > > >> > > > > Hi Piotr,
>>>> > > > >> > > >> > > > >
>>>> > > > >> > > >> > > > > Thanks for joining the discussion. Make “local
>>>> > > > aggregation"
>>>> > > > >> > > >> abstract
>>>> > > > >> > > >> > > > enough
>>>> > > > >> > > >> > > > > sounds good to me, we could
>>>> > > > >> > > >> > > > > implement and verify alternative solutions for
>>>> use
>>>> > > cases
>>>> > > > of
>>>> > > > >> > > local
>>>> > > > >> > > >> > > > > aggregation. Maybe we will find both solutions
>>>> > > > >> > > >> > > > > are appropriate for different scenarios.
>>>> > > > >> > > >> > > > >
>>>> > > > >> > > >> > > > > Starting from a simple one sounds a practical
>>>> way to
>>>> > > go.
>>>> > > > >> What
>>>> > > > >> > do
>>>> > > > >> > > >> you
>>>> > > > >> > > >> > > > think,
>>>> > > > >> > > >> > > > > vino?
>>>> > > > >> > > >> > > > >
>>>> > > > >> > > >> > > > > Best,
>>>> > > > >> > > >> > > > > Kurt
>>>> > > > >> > > >> > > > >
>>>> > > > >> > > >> > > > >
>>>> > > > >> > > >> > > > > On Tue, Jun 18, 2019 at 8:10 PM Piotr Nowojski <
>>>> > > > >> > > >> pi...@ververica.com <mailto:pi...@ververica.com>>
>>>> > > > >> > > >> > > > > wrote:
>>>> > > > >> > > >> > > > >
>>>> > > > >> > > >> > > > > > Hi Kurt and Vino,
>>>> > > > >> > > >> > > > > >
>>>> > > > >> > > >> > > > > > I think there is a trade of hat we need to
>>>> consider
>>>> > > for
>>>> > > > >> the
>>>> > > > >> > > >> local
>>>> > > > >> > > >> > > > > > aggregation.
>>>> > > > >> > > >> > > > > >
>>>> > > > >> > > >> > > > > > Generally speaking I would agree with Kurt
>>>> about
>>>> > > local
>>>> > > > >> > > >> > > aggregation/pre
>>>> > > > >> > > >> > > > > > aggregation not using Flink's state flush the
>>>> > > operator
>>>> > > > >> on a
>>>> > > > >> > > >> > > checkpoint.
>>>> > > > >> > > >> > > > > > Network IO is usually cheaper compared to
>>>> Disks IO.
>>>> > > > This
>>>> > > > >> has
>>>> > > > >> > > >> > however
>>>> > > > >> > > >> > > > > couple
>>>> > > > >> > > >> > > > > > of issues:
>>>> > > > >> > > >> > > > > > 1. It can explode number of in-flight records
>>>> during
>>>> > > > >> > > checkpoint
>>>> > > > >> > > >> > > barrier
>>>> > > > >> > > >> > > > > > alignment, making checkpointing slower and
>>>> decrease
>>>> > > the
>>>> > > > >> > actual
>>>> > > > >> > > >> > > > > throughput.
>>>> > > > >> > > >> > > > > > 2. This trades Disks IO on the local
>>>> aggregation
>>>> > > > machine
>>>> > > > >> > with
>>>> > > > >> > > >> CPU
>>>> > > > >> > > >> > > (and
>>>> > > > >> > > >> > > > > > Disks IO in case of RocksDB) on the final
>>>> aggregation
>>>> > > > >> > machine.
>>>> > > > >> > > >> This
>>>> > > > >> > > >> > > is
>>>> > > > >> > > >> > > > > > fine, as long there is no huge data skew. If
>>>> there is
>>>> > > > >> only a
>>>> > > > >> > > >> > handful
>>>> > > > >> > > >> > > > (or
>>>> > > > >> > > >> > > > > > even one single) hot keys, it might be better
>>>> to keep
>>>> > > > the
>>>> > > > >> > > >> > persistent
>>>> > > > >> > > >> > > > > state
>>>> > > > >> > > >> > > > > > in the LocalAggregationOperator to offload
>>>> final
>>>> > > > >> aggregation
>>>> > > > >> > > as
>>>> > > > >> > > >> > much
>>>> > > > >> > > >> > > as
>>>> > > > >> > > >> > > > > > possible.
>>>> > > > >> > > >> > > > > > 3. With frequent checkpointing local
>>>> aggregation
>>>> > > > >> > effectiveness
>>>> > > > >> > > >> > would
>>>> > > > >> > > >> > > > > > degrade.
>>>> > > > >> > > >> > > > > >
>>>> > > > >> > > >> > > > > > I assume Kurt is correct, that in your use
>>>> cases
>>>> > > > >> stateless
>>>> > > > >> > > >> operator
>>>> > > > >> > > >> > > was
>>>> > > > >> > > >> > > > > > behaving better, but I could easily see other
>>>> use
>>>> > > cases
>>>> > > > >> as
>>>> > > > >> > > well.
>>>> > > > >> > > >> > For
>>>> > > > >> > > >> > > > > > example someone is already using RocksDB, and
>>>> his job
>>>> > > > is
>>>> > > > >> > > >> > bottlenecked
>>>> > > > >> > > >> > > > on
>>>> > > > >> > > >> > > > > a
>>>> > > > >> > > >> > > > > > single window operator instance because of
>>>> the data
>>>> > > > >> skew. In
>>>> > > > >> > > >> that
>>>> > > > >> > > >> > > case
>>>> > > > >> > > >> > > > > > stateful local aggregation would be probably
>>>> a better
>>>> > > > >> > choice.
>>>> > > > >> > > >> > > > > >
>>>> > > > >> > > >> > > > > > Because of that, I think we should eventually
>>>> provide
>>>> > > > >> both
>>>> > > > >> > > >> versions
>>>> > > > >> > > >> > > and
>>>> > > > >> > > >> > > > > in
>>>> > > > >> > > >> > > > > > the initial version we should at least make
>>>> the
>>>> > > “local
>>>> > > > >> > > >> aggregation
>>>> > > > >> > > >> > > > > engine”
>>>> > > > >> > > >> > > > > > abstract enough, that one could easily provide
>>>> > > > different
>>>> > > > >> > > >> > > implementation
>>>> > > > >> > > >> > > > > > strategy.
>>>> > > > >> > > >> > > > > >
>>>> > > > >> > > >> > > > > > Piotrek
>>>> > > > >> > > >> > > > > >
>>>> > > > >> > > >> > > > > > > On 18 Jun 2019, at 11:46, Kurt Young <
>>>> > > > ykt...@gmail.com <mailto:ykt...@gmail.com>
>>>> > > > >> >
>>>> > > > >> > > >> wrote:
>>>> > > > >> > > >> > > > > > >
>>>> > > > >> > > >> > > > > > > Hi,
>>>> > > > >> > > >> > > > > > >
>>>> > > > >> > > >> > > > > > > For the trigger, it depends on what
>>>> operator we
>>>> > > want
>>>> > > > to
>>>> > > > >> > use
>>>> > > > >> > > >> under
>>>> > > > >> > > >> > > the
>>>> > > > >> > > >> > > > > > API.
>>>> > > > >> > > >> > > > > > > If we choose to use window operator,
>>>> > > > >> > > >> > > > > > > we should also use window's trigger.
>>>> However, I
>>>> > > also
>>>> > > > >> think
>>>> > > > >> > > >> reuse
>>>> > > > >> > > >> > > > window
>>>> > > > >> > > >> > > > > > > operator for this scenario may not be
>>>> > > > >> > > >> > > > > > > the best choice. The reasons are the
>>>> following:
>>>> > > > >> > > >> > > > > > >
>>>> > > > >> > > >> > > > > > > 1. As a lot of people already pointed out,
>>>> window
>>>> > > > >> relies
>>>> > > > >> > > >> heavily
>>>> > > > >> > > >> > on
>>>> > > > >> > > >> > > > > state
>>>> > > > >> > > >> > > > > > > and it will definitely effect performance.
>>>> You can
>>>> > > > >> > > >> > > > > > > argue that one can use heap based
>>>> statebackend, but
>>>> > > > >> this
>>>> > > > >> > > will
>>>> > > > >> > > >> > > > introduce
>>>> > > > >> > > >> > > > > > > extra coupling. Especially we have a chance
>>>> to
>>>> > > > >> > > >> > > > > > > design a pure stateless operator.
>>>> > > > >> > > >> > > > > > > 2. The window operator is *the most*
>>>> complicated
>>>> > > > >> operator
>>>> > > > >> > > >> Flink
>>>> > > > >> > > >> > > > > currently
>>>> > > > >> > > >> > > > > > > have. Maybe we only need to pick a subset of
>>>> > > > >> > > >> > > > > > > window operator to achieve the goal, but
>>>> once the
>>>> > > > user
>>>> > > > >> > wants
>>>> > > > >> > > >> to
>>>> > > > >> > > >> > > have
>>>> > > > >> > > >> > > > a
>>>> > > > >> > > >> > > > > > deep
>>>> > > > >> > > >> > > > > > > look at the localAggregation operator, it's
>>>> still
>>>> > > > >> > > >> > > > > > > hard to find out what's going on under the
>>>> window
>>>> > > > >> > operator.
>>>> > > > >> > > >> For
>>>> > > > >> > > >> > > > > > simplicity,
>>>> > > > >> > > >> > > > > > > I would also recommend we introduce a
>>>> dedicated
>>>> > > > >> > > >> > > > > > > lightweight operator, which also much
>>>> easier for a
>>>> > > > >> user to
>>>> > > > >> > > >> learn
>>>> > > > >> > > >> > > and
>>>> > > > >> > > >> > > > > use.
>>>> > > > >> > > >> > > > > > >
>>>> > > > >> > > >> > > > > > > For your question about increasing the
>>>> burden in
>>>> > > > >> > > >> > > > > > >
>>>> `StreamOperator::prepareSnapshotPreBarrier()`, the
>>>> > > > only
>>>> > > > >> > > thing
>>>> > > > >> > > >> > this
>>>> > > > >> > > >> > > > > > function
>>>> > > > >> > > >> > > > > > > need
>>>> > > > >> > > >> > > > > > > to do is output all the partial results,
>>>> it's
>>>> > > purely
>>>> > > > >> cpu
>>>> > > > >> > > >> > workload,
>>>> > > > >> > > >> > > > not
>>>> > > > >> > > >> > > > > > > introducing any IO. I want to point out
>>>> that even
>>>> > > if
>>>> > > > we
>>>> > > > >> > have
>>>> > > > >> > > >> this
>>>> > > > >> > > >> > > > > > > cost, we reduced another barrier align cost
>>>> of the
>>>> > > > >> > operator,
>>>> > > > >> > > >> > which
>>>> > > > >> > > >> > > is
>>>> > > > >> > > >> > > > > the
>>>> > > > >> > > >> > > > > > > sync flush stage of the state, if you
>>>> introduced
>>>> > > > state.
>>>> > > > >> > This
>>>> > > > >> > > >> > > > > > > flush actually will introduce disk IO, and
>>>> I think
>>>> > > > it's
>>>> > > > >> > > >> worthy to
>>>> > > > >> > > >> > > > > > exchange
>>>> > > > >> > > >> > > > > > > this cost with purely CPU workload. And we
>>>> do have
>>>> > > > some
>>>> > > > >> > > >> > > > > > > observations about these two behavior (as i
>>>> said
>>>> > > > >> before,
>>>> > > > >> > we
>>>> > > > >> > > >> > > actually
>>>> > > > >> > > >> > > > > > > implemented both solutions), the stateless
>>>> one
>>>> > > > actually
>>>> > > > >> > > >> performs
>>>> > > > >> > > >> > > > > > > better both in performance and barrier
>>>> align time.
>>>> > > > >> > > >> > > > > > >
>>>> > > > >> > > >> > > > > > > Best,
>>>> > > > >> > > >> > > > > > > Kurt
>>>> > > > >> > > >> > > > > > >
>>>> > > > >> > > >> > > > > > >
>>>> > > > >> > > >> > > > > > > On Tue, Jun 18, 2019 at 3:40 PM vino yang <
>>>> > > > >> > > >> yanghua1...@gmail.com <mailto:yanghua1...@gmail.com>
>>>> > > > >> > > >> > >
>>>> > > > >> > > >> > > > > wrote:
>>>> > > > >> > > >> > > > > > >
>>>> > > > >> > > >> > > > > > >> Hi Kurt,
>>>> > > > >> > > >> > > > > > >>
>>>> > > > >> > > >> > > > > > >> Thanks for your example. Now, it looks more
>>>> > > clearly
>>>> > > > >> for
>>>> > > > >> > me.
>>>> > > > >> > > >> > > > > > >>
>>>> > > > >> > > >> > > > > > >> From your example code snippet, I saw the
>>>> > > > >> localAggregate
>>>> > > > >> > > API
>>>> > > > >> > > >> has
>>>> > > > >> > > >> > > > three
>>>> > > > >> > > >> > > > > > >> parameters:
>>>> > > > >> > > >> > > > > > >>
>>>> > > > >> > > >> > > > > > >>   1. key field
>>>> > > > >> > > >> > > > > > >>   2. PartitionAvg
>>>> > > > >> > > >> > > > > > >>   3. CountTrigger: Does this trigger comes
>>>> from
>>>> > > > window
>>>> > > > >> > > >> package?
>>>> > > > >> > > >> > > > > > >>
>>>> > > > >> > > >> > > > > > >> I will compare our and your design from
>>>> API and
>>>> > > > >> operator
>>>> > > > >> > > >> level:
>>>> > > > >> > > >> > > > > > >>
>>>> > > > >> > > >> > > > > > >> *From the API level:*
>>>> > > > >> > > >> > > > > > >>
>>>> > > > >> > > >> > > > > > >> As I replied to @dianfu in the old email
>>>> > > thread,[1]
>>>> > > > >> the
>>>> > > > >> > > >> Window
>>>> > > > >> > > >> > API
>>>> > > > >> > > >> > > > can
>>>> > > > >> > > >> > > > > > >> provide the second and the third parameter
>>>> right
>>>> > > > now.
>>>> > > > >> > > >> > > > > > >>
>>>> > > > >> > > >> > > > > > >> If you reuse specified interface or class,
>>>> such as
>>>> > > > >> > > *Trigger*
>>>> > > > >> > > >> or
>>>> > > > >> > > >> > > > > > >> *CounterTrigger* provided by window
>>>> package, but
>>>> > > do
>>>> > > > >> not
>>>> > > > >> > use
>>>> > > > >> > > >> > window
>>>> > > > >> > > >> > > > > API,
>>>> > > > >> > > >> > > > > > >> it's not reasonable.
>>>> > > > >> > > >> > > > > > >> And if you do not reuse these interface or
>>>> class,
>>>> > > > you
>>>> > > > >> > would
>>>> > > > >> > > >> need
>>>> > > > >> > > >> > > to
>>>> > > > >> > > >> > > > > > >> introduce more things however they are
>>>> looked
>>>> > > > similar
>>>> > > > >> to
>>>> > > > >> > > the
>>>> > > > >> > > >> > > things
>>>> > > > >> > > >> > > > > > >> provided by window package.
>>>> > > > >> > > >> > > > > > >>
>>>> > > > >> > > >> > > > > > >> The window package has provided several
>>>> types of
>>>> > > the
>>>> > > > >> > window
>>>> > > > >> > > >> and
>>>> > > > >> > > >> > > many
>>>> > > > >> > > >> > > > > > >> triggers and let users customize it.
>>>> What's more,
>>>> > > > the
>>>> > > > >> > user
>>>> > > > >> > > is
>>>> > > > >> > > >> > more
>>>> > > > >> > > >> > > > > > familiar
>>>> > > > >> > > >> > > > > > >> with Window API.
>>>> > > > >> > > >> > > > > > >>
>>>> > > > >> > > >> > > > > > >> This is the reason why we just provide
>>>> localKeyBy
>>>> > > > API
>>>> > > > >> and
>>>> > > > >> > > >> reuse
>>>> > > > >> > > >> > > the
>>>> > > > >> > > >> > > > > > window
>>>> > > > >> > > >> > > > > > >> API. It reduces unnecessary components
>>>> such as
>>>> > > > >> triggers
>>>> > > > >> > and
>>>> > > > >> > > >> the
>>>> > > > >> > > >> > > > > > mechanism
>>>> > > > >> > > >> > > > > > >> of buffer (based on count num or time).
>>>> > > > >> > > >> > > > > > >> And it has a clear and easy to understand
>>>> > > semantics.
>>>> > > > >> > > >> > > > > > >>
>>>> > > > >> > > >> > > > > > >> *From the operator level:*
>>>> > > > >> > > >> > > > > > >>
>>>> > > > >> > > >> > > > > > >> We reused window operator, so we can get
>>>> all the
>>>> > > > >> benefits
>>>> > > > >> > > >> from
>>>> > > > >> > > >> > > state
>>>> > > > >> > > >> > > > > and
>>>> > > > >> > > >> > > > > > >> checkpoint.
>>>> > > > >> > > >> > > > > > >>
>>>> > > > >> > > >> > > > > > >> From your design, you named the operator
>>>> under
>>>> > > > >> > > localAggregate
>>>> > > > >> > > >> > API
>>>> > > > >> > > >> > > > is a
>>>> > > > >> > > >> > > > > > >> *stateless* operator. IMO, it is still a
>>>> state, it
>>>> > > > is
>>>> > > > >> > just
>>>> > > > >> > > >> not
>>>> > > > >> > > >> > > Flink
>>>> > > > >> > > >> > > > > > >> managed state.
>>>> > > > >> > > >> > > > > > >> About the memory buffer (I think it's
>>>> still not
>>>> > > very
>>>> > > > >> > clear,
>>>> > > > >> > > >> if
>>>> > > > >> > > >> > you
>>>> > > > >> > > >> > > > > have
>>>> > > > >> > > >> > > > > > >> time, can you give more detail information
>>>> or
>>>> > > answer
>>>> > > > >> my
>>>> > > > >> > > >> > > questions),
>>>> > > > >> > > >> > > > I
>>>> > > > >> > > >> > > > > > have
>>>> > > > >> > > >> > > > > > >> some questions:
>>>> > > > >> > > >> > > > > > >>
>>>> > > > >> > > >> > > > > > >>   - if it just a raw JVM heap memory
>>>> buffer, how
>>>> > > to
>>>> > > > >> > support
>>>> > > > >> > > >> > fault
>>>> > > > >> > > >> > > > > > >>   tolerance, if the job is configured
>>>> EXACTLY-ONCE
>>>> > > > >> > semantic
>>>> > > > >> > > >> > > > guarantee?
>>>> > > > >> > > >> > > > > > >>   - if you thought the memory
>>>> buffer(non-Flink
>>>> > > > state),
>>>> > > > >> > has
>>>> > > > >> > > >> > better
>>>> > > > >> > > >> > > > > > >>   performance. In our design, users can
>>>> also
>>>> > > config
>>>> > > > >> HEAP
>>>> > > > >> > > >> state
>>>> > > > >> > > >> > > > backend
>>>> > > > >> > > >> > > > > > to
>>>> > > > >> > > >> > > > > > >>   provide the performance close to your
>>>> mechanism.
>>>> > > > >> > > >> > > > > > >>   -
>>>> `StreamOperator::prepareSnapshotPreBarrier()`
>>>> > > > >> related
>>>> > > > >> > > to
>>>> > > > >> > > >> the
>>>> > > > >> > > >> > > > > timing
>>>> > > > >> > > >> > > > > > of
>>>> > > > >> > > >> > > > > > >>   snapshot. IMO, the flush action should
>>>> be a
>>>> > > > >> > synchronized
>>>> > > > >> > > >> > action?
>>>> > > > >> > > >> > > > (if
>>>> > > > >> > > >> > > > > > >> not,
>>>> > > > >> > > >> > > > > > >>   please point out my mistake) I still
>>>> think we
>>>> > > > should
>>>> > > > >> > not
>>>> > > > >> > > >> > depend
>>>> > > > >> > > >> > > on
>>>> > > > >> > > >> > > > > the
>>>> > > > >> > > >> > > > > > >>   timing of checkpoint. Checkpoint related
>>>> > > > operations
>>>> > > > >> are
>>>> > > > >> > > >> > inherent
>>>> > > > >> > > >> > > > > > >>   performance sensitive, we should not
>>>> increase
>>>> > > its
>>>> > > > >> > burden
>>>> > > > >> > > >> > > anymore.
>>>> > > > >> > > >> > > > > Our
>>>> > > > >> > > >> > > > > > >>   implementation based on the mechanism of
>>>> Flink's
>>>> > > > >> > > >> checkpoint,
>>>> > > > >> > > >> > > which
>>>> > > > >> > > >> > > > > can
>>>> > > > >> > > >> > > > > > >>   benefit from the asnyc snapshot and
>>>> incremental
>>>> > > > >> > > checkpoint.
>>>> > > > >> > > >> > IMO,
>>>> > > > >> > > >> > > > the
>>>> > > > >> > > >> > > > > > >>   performance is not a problem, and we
>>>> also do not
>>>> > > > >> find
>>>> > > > >> > the
>>>> > > > >> > > >> > > > > performance
>>>> > > > >> > > >> > > > > > >> issue
>>>> > > > >> > > >> > > > > > >>   in our production.
>>>> > > > >> > > >> > > > > > >>
>>>> > > > >> > > >> > > > > > >> [1]:
>>>> > > > >> > > >> > > > > > >>
>>>> > > > >> > > >> > > > > > >>
>>>> > > > >> > > >> > > > > >
>>>> > > > >> > > >> > > > >
>>>> > > > >> > > >> > > >
>>>> > > > >> > > >> > >
>>>> > > > >> > > >> >
>>>> > > > >> > > >>
>>>> > > > >> > >
>>>> > > > >> >
>>>> > > > >>
>>>> > > >
>>>> > >
>>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308
>>>> <
>>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308
>>>> >
>>>> > > > >> > > >> > > > > > >>
>>>> > > > >> > > >> > > > > > >> Best,
>>>> > > > >> > > >> > > > > > >> Vino
>>>> > > > >> > > >> > > > > > >>
>>>> > > > >> > > >> > > > > > >> Kurt Young <ykt...@gmail.com <mailto:
>>>> ykt...@gmail.com>> 于2019年6月18日周二
>>>> > > > 下午2:27写道:
>>>> > > > >> > > >> > > > > > >>
>>>> > > > >> > > >> > > > > > >>> Yeah, sorry for not expressing myself
>>>> clearly. I
>>>> > > > will
>>>> > > > >> > try
>>>> > > > >> > > to
>>>> > > > >> > > >> > > > provide
>>>> > > > >> > > >> > > > > > more
>>>> > > > >> > > >> > > > > > >>> details to make sure we are on the same
>>>> page.
>>>> > > > >> > > >> > > > > > >>>
>>>> > > > >> > > >> > > > > > >>> For DataStream API, it shouldn't be
>>>> optimized
>>>> > > > >> > > automatically.
>>>> > > > >> > > >> > You
>>>> > > > >> > > >> > > > have
>>>> > > > >> > > >> > > > > > to
>>>> > > > >> > > >> > > > > > >>> explicitly call API to do local
>>>> aggregation
>>>> > > > >> > > >> > > > > > >>> as well as the trigger policy of the local
>>>> > > > >> aggregation.
>>>> > > > >> > > Take
>>>> > > > >> > > >> > > > average
>>>> > > > >> > > >> > > > > > for
>>>> > > > >> > > >> > > > > > >>> example, the user program may look like
>>>> this
>>>> > > (just
>>>> > > > a
>>>> > > > >> > > draft):
>>>> > > > >> > > >> > > > > > >>>
>>>> > > > >> > > >> > > > > > >>> assuming the input type is
>>>> > > DataStream<Tupl2<String,
>>>> > > > >> > Int>>
>>>> > > > >> > > >> > > > > > >>>
>>>> > > > >> > > >> > > > > > >>> ds.localAggregate(
>>>> > > > >> > > >> > > > > > >>>        0,
>>>> > >  //
>>>> > > > >> The
>>>> > > > >> > > local
>>>> > > > >> > > >> > key,
>>>> > > > >> > > >> > > > > which
>>>> > > > >> > > >> > > > > > >> is
>>>> > > > >> > > >> > > > > > >>> the String from Tuple2
>>>> > > > >> > > >> > > > > > >>>        PartitionAvg(1),
>>>>  // The
>>>> > > > >> partial
>>>> > > > >> > > >> > > aggregation
>>>> > > > >> > > >> > > > > > >>> function, produces Tuple2<Long, Int>,
>>>> indicating
>>>> > > > sum
>>>> > > > >> and
>>>> > > > >> > > >> count
>>>> > > > >> > > >> > > > > > >>>        CountTrigger.of(1000L)    //
>>>> Trigger
>>>> > > policy,
>>>> > > > >> note
>>>> > > > >> > > >> this
>>>> > > > >> > > >> > > > should
>>>> > > > >> > > >> > > > > be
>>>> > > > >> > > >> > > > > > >>> best effort, and also be composited with
>>>> time
>>>> > > based
>>>> > > > >> or
>>>> > > > >> > > >> memory
>>>> > > > >> > > >> > > size
>>>> > > > >> > > >> > > > > > based
>>>> > > > >> > > >> > > > > > >>> trigger
>>>> > > > >> > > >> > > > > > >>>    )
>>>>      //
>>>> > > > The
>>>> > > > >> > > return
>>>> > > > >> > > >> > type
>>>> > > > >> > > >> > > > is
>>>> > > > >> > > >> > > > > > >> local
>>>> > > > >> > > >> > > > > > >>> aggregate Tuple2<String, Tupl2<Long, Int>>
>>>> > > > >> > > >> > > > > > >>>    .keyBy(0)
>>>>  //
>>>> > > Further
>>>> > > > >> > keyby
>>>> > > > >> > > it
>>>> > > > >> > > >> > with
>>>> > > > >> > > >> > > > > > >> required
>>>> > > > >> > > >> > > > > > >>> key
>>>> > > > >> > > >> > > > > > >>>    .aggregate(1)                      //
>>>> This
>>>> > > will
>>>> > > > >> merge
>>>> > > > >> > > all
>>>> > > > >> > > >> > the
>>>> > > > >> > > >> > > > > > partial
>>>> > > > >> > > >> > > > > > >>> results and get the final average.
>>>> > > > >> > > >> > > > > > >>>
>>>> > > > >> > > >> > > > > > >>> (This is only a draft, only trying to
>>>> explain
>>>> > > what
>>>> > > > it
>>>> > > > >> > > looks
>>>> > > > >> > > >> > > like. )
>>>> > > > >> > > >> > > > > > >>>
>>>> > > > >> > > >> > > > > > >>> The local aggregate operator can be
>>>> stateless, we
>>>> > > > can
>>>> > > > >> > > keep a
>>>> > > > >> > > >> > > memory
>>>> > > > >> > > >> > > > > > >> buffer
>>>> > > > >> > > >> > > > > > >>> or other efficient data structure to
>>>> improve the
>>>> > > > >> > aggregate
>>>> > > > >> > > >> > > > > performance.
>>>> > > > >> > > >> > > > > > >>>
>>>> > > > >> > > >> > > > > > >>> Let me know if you have any other
>>>> questions.
>>>> > > > >> > > >> > > > > > >>>
>>>> > > > >> > > >> > > > > > >>> Best,
>>>> > > > >> > > >> > > > > > >>> Kurt
>>>> > > > >> > > >> > > > > > >>>
>>>> > > > >> > > >> > > > > > >>>
>>>> > > > >> > > >> > > > > > >>> On Tue, Jun 18, 2019 at 1:29 PM vino yang
>>>> <
>>>> > > > >> > > >> > yanghua1...@gmail.com <mailto:yanghua1...@gmail.com>
>>>> > > > >> > > >> > > >
>>>> > > > >> > > >> > > > > > wrote:
>>>> > > > >> > > >> > > > > > >>>
>>>> > > > >> > > >> > > > > > >>>> Hi Kurt,
>>>> > > > >> > > >> > > > > > >>>>
>>>> > > > >> > > >> > > > > > >>>> Thanks for your reply.
>>>> > > > >> > > >> > > > > > >>>>
>>>> > > > >> > > >> > > > > > >>>> Actually, I am not against you to raise
>>>> your
>>>> > > > design.
>>>> > > > >> > > >> > > > > > >>>>
>>>> > > > >> > > >> > > > > > >>>> From your description before, I just can
>>>> imagine
>>>> > > > >> your
>>>> > > > >> > > >> > high-level
>>>> > > > >> > > >> > > > > > >>>> implementation is about SQL and the
>>>> optimization
>>>> > > > is
>>>> > > > >> > inner
>>>> > > > >> > > >> of
>>>> > > > >> > > >> > the
>>>> > > > >> > > >> > > > > API.
>>>> > > > >> > > >> > > > > > >> Is
>>>> > > > >> > > >> > > > > > >>> it
>>>> > > > >> > > >> > > > > > >>>> automatically? how to give the
>>>> configuration
>>>> > > > option
>>>> > > > >> > about
>>>> > > > >> > > >> > > trigger
>>>> > > > >> > > >> > > > > > >>>> pre-aggregation?
>>>> > > > >> > > >> > > > > > >>>>
>>>> > > > >> > > >> > > > > > >>>> Maybe after I get more information, it
>>>> sounds
>>>> > > more
>>>> > > > >> > > >> reasonable.
>>>> > > > >> > > >> > > > > > >>>>
>>>> > > > >> > > >> > > > > > >>>> IMO, first of all, it would be better to
>>>> make
>>>> > > your
>>>> > > > >> user
>>>> > > > >> > > >> > > interface
>>>> > > > >> > > >> > > > > > >>> concrete,
>>>> > > > >> > > >> > > > > > >>>> it's the basis of the discussion.
>>>> > > > >> > > >> > > > > > >>>>
>>>> > > > >> > > >> > > > > > >>>> For example, can you give an example code
>>>> > > snippet
>>>> > > > to
>>>> > > > >> > > >> introduce
>>>> > > > >> > > >> > > how
>>>> > > > >> > > >> > > > > to
>>>> > > > >> > > >> > > > > > >>> help
>>>> > > > >> > > >> > > > > > >>>> users to process data skew caused by the
>>>> jobs
>>>> > > > which
>>>> > > > >> > built
>>>> > > > >> > > >> with
>>>> > > > >> > > >> > > > > > >> DataStream
>>>> > > > >> > > >> > > > > > >>>> API?
>>>> > > > >> > > >> > > > > > >>>>
>>>> > > > >> > > >> > > > > > >>>> If you give more details we can discuss
>>>> further
>>>> > > > >> more. I
>>>> > > > >> > > >> think
>>>> > > > >> > > >> > if
>>>> > > > >> > > >> > > > one
>>>> > > > >> > > >> > > > > > >>> design
>>>> > > > >> > > >> > > > > > >>>> introduces an exact interface and
>>>> another does
>>>> > > > not.
>>>> > > > >> > > >> > > > > > >>>>
>>>> > > > >> > > >> > > > > > >>>> The implementation has an obvious
>>>> difference.
>>>> > > For
>>>> > > > >> > > example,
>>>> > > > >> > > >> we
>>>> > > > >> > > >> > > > > > introduce
>>>> > > > >> > > >> > > > > > >>> an
>>>> > > > >> > > >> > > > > > >>>> exact API in DataStream named
>>>> localKeyBy, about
>>>> > > > the
>>>> > > > >> > > >> > > > pre-aggregation
>>>> > > > >> > > >> > > > > we
>>>> > > > >> > > >> > > > > > >>> need
>>>> > > > >> > > >> > > > > > >>>> to define the trigger mechanism of local
>>>> > > > >> aggregation,
>>>> > > > >> > so
>>>> > > > >> > > we
>>>> > > > >> > > >> > find
>>>> > > > >> > > >> > > > > > reused
>>>> > > > >> > > >> > > > > > >>>> window API and operator is a good
>>>> choice. This
>>>> > > is
>>>> > > > a
>>>> > > > >> > > >> reasoning
>>>> > > > >> > > >> > > link
>>>> > > > >> > > >> > > > > > from
>>>> > > > >> > > >> > > > > > >>>> design to implementation.
>>>> > > > >> > > >> > > > > > >>>>
>>>> > > > >> > > >> > > > > > >>>> What do you think?
>>>> > > > >> > > >> > > > > > >>>>
>>>> > > > >> > > >> > > > > > >>>> Best,
>>>> > > > >> > > >> > > > > > >>>> Vino
>>>> > > > >> > > >> > > > > > >>>>
>>>> > > > >> > > >> > > > > > >>>>
>>>> > > > >> > > >> > > > > > >>>> Kurt Young <ykt...@gmail.com <mailto:
>>>> ykt...@gmail.com>> 于2019年6月18日周二
>>>> > > > >> 上午11:58写道:
>>>> > > > >> > > >> > > > > > >>>>
>>>> > > > >> > > >> > > > > > >>>>> Hi Vino,
>>>> > > > >> > > >> > > > > > >>>>>
>>>> > > > >> > > >> > > > > > >>>>> Now I feel that we may have different
>>>> > > > >> understandings
>>>> > > > >> > > about
>>>> > > > >> > > >> > what
>>>> > > > >> > > >> > > > > kind
>>>> > > > >> > > >> > > > > > >> of
>>>> > > > >> > > >> > > > > > >>>>> problems or improvements you want to
>>>> > > > >> > > >> > > > > > >>>>> resolve. Currently, most of the
>>>> feedback are
>>>> > > > >> focusing
>>>> > > > >> > on
>>>> > > > >> > > >> *how
>>>> > > > >> > > >> > > to
>>>> > > > >> > > >> > > > > do a
>>>> > > > >> > > >> > > > > > >>>>> proper local aggregation to improve
>>>> performance
>>>> > > > >> > > >> > > > > > >>>>> and maybe solving the data skew issue*.
>>>> And my
>>>> > > > gut
>>>> > > > >> > > >> feeling is
>>>> > > > >> > > >> > > > this
>>>> > > > >> > > >> > > > > is
>>>> > > > >> > > >> > > > > > >>>>> exactly what users want at the first
>>>> place,
>>>> > > > >> > > >> > > > > > >>>>> especially those +1s. (Sorry to try to
>>>> > > summarize
>>>> > > > >> here,
>>>> > > > >> > > >> please
>>>> > > > >> > > >> > > > > correct
>>>> > > > >> > > >> > > > > > >>> me
>>>> > > > >> > > >> > > > > > >>>> if
>>>> > > > >> > > >> > > > > > >>>>> i'm wrong).
>>>> > > > >> > > >> > > > > > >>>>>
>>>> > > > >> > > >> > > > > > >>>>> But I still think the design is somehow
>>>> > > diverged
>>>> > > > >> from
>>>> > > > >> > > the
>>>> > > > >> > > >> > goal.
>>>> > > > >> > > >> > > > If
>>>> > > > >> > > >> > > > > we
>>>> > > > >> > > >> > > > > > >>>> want
>>>> > > > >> > > >> > > > > > >>>>> to have an efficient and powerful way to
>>>> > > > >> > > >> > > > > > >>>>> have local aggregation, supporting
>>>> intermedia
>>>> > > > >> result
>>>> > > > >> > > type
>>>> > > > >> > > >> is
>>>> > > > >> > > >> > > > > > >> essential
>>>> > > > >> > > >> > > > > > >>>> IMO.
>>>> > > > >> > > >> > > > > > >>>>> Both runtime's `AggregateFunction` and
>>>> > > > >> > > >> > > > > > >>>>> SQL`s `UserDefinedAggregateFunction`
>>>> have a
>>>> > > > proper
>>>> > > > >> > > >> support of
>>>> > > > >> > > >> > > > > > >>>> intermediate
>>>> > > > >> > > >> > > > > > >>>>> result type and can do `merge` operation
>>>> > > > >> > > >> > > > > > >>>>> on them.
>>>> > > > >> > > >> > > > > > >>>>>
>>>> > > > >> > > >> > > > > > >>>>> Now, we have a lightweight alternatives
>>>> which
>>>> > > > >> performs
>>>> > > > >> > > >> well,
>>>> > > > >> > > >> > > and
>>>> > > > >> > > >> > > > > > >> have a
>>>> > > > >> > > >> > > > > > >>>>> nice fit with the local aggregate
>>>> requirements.
>>>> > > > >> > > >> > > > > > >>>>> Mostly importantly,  it's much less
>>>> complex
>>>> > > > because
>>>> > > > >> > it's
>>>> > > > >> > > >> > > > stateless.
>>>> > > > >> > > >> > > > > > >> And
>>>> > > > >> > > >> > > > > > >>>> it
>>>> > > > >> > > >> > > > > > >>>>> can also achieve the similar
>>>> > > multiple-aggregation
>>>> > > > >> > > >> > > > > > >>>>> scenario.
>>>> > > > >> > > >> > > > > > >>>>>
>>>> > > > >> > > >> > > > > > >>>>> I still not convinced why we shouldn't
>>>> consider
>>>> > > > it
>>>> > > > >> as
>>>> > > > >> > a
>>>> > > > >> > > >> first
>>>> > > > >> > > >> > > > step.
>>>> > > > >> > > >> > > > > > >>>>>
>>>> > > > >> > > >> > > > > > >>>>> Best,
>>>> > > > >> > > >> > > > > > >>>>> Kurt
>>>> > > > >> > > >> > > > > > >>>>>
>>>> > > > >> > > >> > > > > > >>>>>
>>>> > > > >> > > >> > > > > > >>>>> On Tue, Jun 18, 2019 at 11:35 AM vino
>>>> yang <
>>>> > > > >> > > >> > > > yanghua1...@gmail.com <mailto:
>>>> yanghua1...@gmail.com>>
>>>> > > > >> > > >> > > > > > >>>> wrote:
>>>> > > > >> > > >> > > > > > >>>>>
>>>> > > > >> > > >> > > > > > >>>>>> Hi Kurt,
>>>> > > > >> > > >> > > > > > >>>>>>
>>>> > > > >> > > >> > > > > > >>>>>> Thanks for your comments.
>>>> > > > >> > > >> > > > > > >>>>>>
>>>> > > > >> > > >> > > > > > >>>>>> It seems we both implemented local
>>>> aggregation
>>>> > > > >> > feature
>>>> > > > >> > > to
>>>> > > > >> > > >> > > > optimize
>>>> > > > >> > > >> > > > > > >>> the
>>>> > > > >> > > >> > > > > > >>>>>> issue of data skew.
>>>> > > > >> > > >> > > > > > >>>>>> However, IMHO, the API level of
>>>> optimizing
>>>> > > > >> revenue is
>>>> > > > >> > > >> > > different.
>>>> > > > >> > > >> > > > > > >>>>>>
>>>> > > > >> > > >> > > > > > >>>>>> *Your optimization benefits from Flink
>>>> SQL and
>>>> > > > >> it's
>>>> > > > >> > not
>>>> > > > >> > > >> > user's
>>>> > > > >> > > >> > > > > > >>>> faces.(If
>>>> > > > >> > > >> > > > > > >>>>> I
>>>> > > > >> > > >> > > > > > >>>>>> understand it incorrectly, please
>>>> correct
>>>> > > > this.)*
>>>> > > > >> > > >> > > > > > >>>>>> *Our implementation employs it as an
>>>> > > > optimization
>>>> > > > >> > tool
>>>> > > > >> > > >> API
>>>> > > > >> > > >> > for
>>>> > > > >> > > >> > > > > > >>>>> DataStream,
>>>> > > > >> > > >> > > > > > >>>>>> it just like a local version of the
>>>> keyBy
>>>> > > API.*
>>>> > > > >> > > >> > > > > > >>>>>>
>>>> > > > >> > > >> > > > > > >>>>>> Based on this, I want to say support
>>>> it as a
>>>> > > > >> > DataStream
>>>> > > > >> > > >> API
>>>> > > > >> > > >> > > can
>>>> > > > >> > > >> > > > > > >>> provide
>>>> > > > >> > > >> > > > > > >>>>>> these advantages:
>>>> > > > >> > > >> > > > > > >>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>   - The localKeyBy API has a clear
>>>> semantic
>>>> > > and
>>>> > > > >> it's
>>>> > > > >> > > >> > flexible
>>>> > > > >> > > >> > > > not
>>>> > > > >> > > >> > > > > > >>> only
>>>> > > > >> > > >> > > > > > >>>>> for
>>>> > > > >> > > >> > > > > > >>>>>>   processing data skew but also for
>>>> > > implementing
>>>> > > > >> some
>>>> > > > >> > > >> user
>>>> > > > >> > > >> > > > cases,
>>>> > > > >> > > >> > > > > > >>> for
>>>> > > > >> > > >> > > > > > >>>>>>   example, if we want to calculate the
>>>> > > > >> multiple-level
>>>> > > > >> > > >> > > > aggregation,
>>>> > > > >> > > >> > > > > > >>> we
>>>> > > > >> > > >> > > > > > >>>>> can
>>>> > > > >> > > >> > > > > > >>>>>> do
>>>> > > > >> > > >> > > > > > >>>>>>   multiple-level aggregation in the
>>>> local
>>>> > > > >> > aggregation:
>>>> > > > >> > > >> > > > > > >>>>>>
>>>> > > > >> > >  input.localKeyBy("a").sum(1).localKeyBy("b").window();
>>>> > > > >> > > >> //
>>>> > > > >> > > >> > > here
>>>> > > > >> > > >> > > > > > >> "a"
>>>> > > > >> > > >> > > > > > >>>> is
>>>> > > > >> > > >> > > > > > >>>>> a
>>>> > > > >> > > >> > > > > > >>>>>>   sub-category, while "b" is a
>>>> category, here
>>>> > > we
>>>> > > > >> do
>>>> > > > >> > not
>>>> > > > >> > > >> need
>>>> > > > >> > > >> > > to
>>>> > > > >> > > >> > > > > > >>>> shuffle
>>>> > > > >> > > >> > > > > > >>>>>> data
>>>> > > > >> > > >> > > > > > >>>>>>   in the network.
>>>> > > > >> > > >> > > > > > >>>>>>   - The users of DataStream API will
>>>> benefit
>>>> > > > from
>>>> > > > >> > this.
>>>> > > > >> > > >> > > > Actually,
>>>> > > > >> > > >> > > > > > >> we
>>>> > > > >> > > >> > > > > > >>>>> have
>>>> > > > >> > > >> > > > > > >>>>>>   a lot of scenes need to use
>>>> DataStream API.
>>>> > > > >> > > Currently,
>>>> > > > >> > > >> > > > > > >> DataStream
>>>> > > > >> > > >> > > > > > >>>> API
>>>> > > > >> > > >> > > > > > >>>>> is
>>>> > > > >> > > >> > > > > > >>>>>>   the cornerstone of the physical plan
>>>> of
>>>> > > Flink
>>>> > > > >> SQL.
>>>> > > > >> > > >> With a
>>>> > > > >> > > >> > > > > > >>> localKeyBy
>>>> > > > >> > > >> > > > > > >>>>>> API,
>>>> > > > >> > > >> > > > > > >>>>>>   the optimization of SQL at least may
>>>> use
>>>> > > this
>>>> > > > >> > > optimized
>>>> > > > >> > > >> > API,
>>>> > > > >> > > >> > > > > > >> this
>>>> > > > >> > > >> > > > > > >>>> is a
>>>> > > > >> > > >> > > > > > >>>>>>   further topic.
>>>> > > > >> > > >> > > > > > >>>>>>   - Based on the window operator, our
>>>> state
>>>> > > > would
>>>> > > > >> > > benefit
>>>> > > > >> > > >> > from
>>>> > > > >> > > >> > > > > > >> Flink
>>>> > > > >> > > >> > > > > > >>>>> State
>>>> > > > >> > > >> > > > > > >>>>>>   and checkpoint, we do not need to
>>>> worry
>>>> > > about
>>>> > > > >> OOM
>>>> > > > >> > and
>>>> > > > >> > > >> job
>>>> > > > >> > > >> > > > > > >> failed.
>>>> > > > >> > > >> > > > > > >>>>>>
>>>> > > > >> > > >> > > > > > >>>>>> Now, about your questions:
>>>> > > > >> > > >> > > > > > >>>>>>
>>>> > > > >> > > >> > > > > > >>>>>> 1. About our design cannot change the
>>>> data
>>>> > > type
>>>> > > > >> and
>>>> > > > >> > > about
>>>> > > > >> > > >> > the
>>>> > > > >> > > >> > > > > > >>>>>> implementation of average:
>>>> > > > >> > > >> > > > > > >>>>>>
>>>> > > > >> > > >> > > > > > >>>>>> Just like my reply to Hequn, the
>>>> localKeyBy is
>>>> > > > an
>>>> > > > >> API
>>>> > > > >> > > >> > provides
>>>> > > > >> > > >> > > > to
>>>> > > > >> > > >> > > > > > >> the
>>>> > > > >> > > >> > > > > > >>>>> users
>>>> > > > >> > > >> > > > > > >>>>>> who use DataStream API to build their
>>>> jobs.
>>>> > > > >> > > >> > > > > > >>>>>> Users should know its semantics and the
>>>> > > > difference
>>>> > > > >> > with
>>>> > > > >> > > >> > keyBy
>>>> > > > >> > > >> > > > API,
>>>> > > > >> > > >> > > > > > >> so
>>>> > > > >> > > >> > > > > > >>>> if
>>>> > > > >> > > >> > > > > > >>>>>> they want to the average aggregation,
>>>> they
>>>> > > > should
>>>> > > > >> > carry
>>>> > > > >> > > >> > local
>>>> > > > >> > > >> > > > sum
>>>> > > > >> > > >> > > > > > >>>> result
>>>> > > > >> > > >> > > > > > >>>>>> and local count result.
>>>> > > > >> > > >> > > > > > >>>>>> I admit that it will be convenient to
>>>> use
>>>> > > keyBy
>>>> > > > >> > > directly.
>>>> > > > >> > > >> > But
>>>> > > > >> > > >> > > we
>>>> > > > >> > > >> > > > > > >> need
>>>> > > > >> > > >> > > > > > >>>> to
>>>> > > > >> > > >> > > > > > >>>>>> pay a little price when we get some
>>>> benefits.
>>>> > > I
>>>> > > > >> think
>>>> > > > >> > > >> this
>>>> > > > >> > > >> > > price
>>>> > > > >> > > >> > > > > is
>>>> > > > >> > > >> > > > > > >>>>>> reasonable. Considering that the
>>>> DataStream
>>>> > > API
>>>> > > > >> > itself
>>>> > > > >> > > >> is a
>>>> > > > >> > > >> > > > > > >> low-level
>>>> > > > >> > > >> > > > > > >>>> API
>>>> > > > >> > > >> > > > > > >>>>>> (at least for now).
>>>> > > > >> > > >> > > > > > >>>>>>
>>>> > > > >> > > >> > > > > > >>>>>> 2. About stateless operator and
>>>> > > > >> > > >> > > > > > >>>>>>
>>>> `StreamOperator::prepareSnapshotPreBarrier()`:
>>>> > > > >> > > >> > > > > > >>>>>>
>>>> > > > >> > > >> > > > > > >>>>>> Actually, I have discussed this
>>>> opinion with
>>>> > > > >> @dianfu
>>>> > > > >> > in
>>>> > > > >> > > >> the
>>>> > > > >> > > >> > > old
>>>> > > > >> > > >> > > > > > >> mail
>>>> > > > >> > > >> > > > > > >>>>>> thread. I will copy my opinion from
>>>> there:
>>>> > > > >> > > >> > > > > > >>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>   - for your design, you still need
>>>> somewhere
>>>> > > to
>>>> > > > >> give
>>>> > > > >> > > the
>>>> > > > >> > > >> > > users
>>>> > > > >> > > >> > > > > > >>>>> configure
>>>> > > > >> > > >> > > > > > >>>>>>   the trigger threshold (maybe memory
>>>> > > > >> availability?),
>>>> > > > >> > > >> this
>>>> > > > >> > > >> > > > design
>>>> > > > >> > > >> > > > > > >>>> cannot
>>>> > > > >> > > >> > > > > > >>>>>>   guarantee a deterministic semantics
>>>> (it will
>>>> > > > >> bring
>>>> > > > >> > > >> trouble
>>>> > > > >> > > >> > > for
>>>> > > > >> > > >> > > > > > >>>> testing
>>>> > > > >> > > >> > > > > > >>>>>> and
>>>> > > > >> > > >> > > > > > >>>>>>   debugging).
>>>> > > > >> > > >> > > > > > >>>>>>   - if the implementation depends on
>>>> the
>>>> > > timing
>>>> > > > of
>>>> > > > >> > > >> > checkpoint,
>>>> > > > >> > > >> > > > it
>>>> > > > >> > > >> > > > > > >>>> would
>>>> > > > >> > > >> > > > > > >>>>>>   affect the checkpoint's progress,
>>>> and the
>>>> > > > >> buffered
>>>> > > > >> > > data
>>>> > > > >> > > >> > may
>>>> > > > >> > > >> > > > > > >> cause
>>>> > > > >> > > >> > > > > > >>>> OOM
>>>> > > > >> > > >> > > > > > >>>>>>   issue. In addition, if the operator
>>>> is
>>>> > > > >> stateless,
>>>> > > > >> > it
>>>> > > > >> > > >> can
>>>> > > > >> > > >> > not
>>>> > > > >> > > >> > > > > > >>> provide
>>>> > > > >> > > >> > > > > > >>>>>> fault
>>>> > > > >> > > >> > > > > > >>>>>>   tolerance.
>>>> > > > >> > > >> > > > > > >>>>>>
>>>> > > > >> > > >> > > > > > >>>>>> Best,
>>>> > > > >> > > >> > > > > > >>>>>> Vino
>>>> > > > >> > > >> > > > > > >>>>>>
>>>> > > > >> > > >> > > > > > >>>>>> Kurt Young <ykt...@gmail.com <mailto:
>>>> ykt...@gmail.com>> 于2019年6月18日周二
>>>> > > > >> > 上午9:22写道:
>>>> > > > >> > > >> > > > > > >>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>> Hi Vino,
>>>> > > > >> > > >> > > > > > >>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>> Thanks for the proposal, I like the
>>>> general
>>>> > > > idea
>>>> > > > >> and
>>>> > > > >> > > IMO
>>>> > > > >> > > >> > it's
>>>> > > > >> > > >> > > > > > >> very
>>>> > > > >> > > >> > > > > > >>>>> useful
>>>> > > > >> > > >> > > > > > >>>>>>> feature.
>>>> > > > >> > > >> > > > > > >>>>>>> But after reading through the
>>>> document, I
>>>> > > feel
>>>> > > > >> that
>>>> > > > >> > we
>>>> > > > >> > > >> may
>>>> > > > >> > > >> > > over
>>>> > > > >> > > >> > > > > > >>>> design
>>>> > > > >> > > >> > > > > > >>>>>> the
>>>> > > > >> > > >> > > > > > >>>>>>> required
>>>> > > > >> > > >> > > > > > >>>>>>> operator for proper local
>>>> aggregation. The
>>>> > > main
>>>> > > > >> > reason
>>>> > > > >> > > >> is
>>>> > > > >> > > >> > we
>>>> > > > >> > > >> > > > want
>>>> > > > >> > > >> > > > > > >>> to
>>>> > > > >> > > >> > > > > > >>>>>> have a
>>>> > > > >> > > >> > > > > > >>>>>>> clear definition and behavior about
>>>> the
>>>> > > "local
>>>> > > > >> keyed
>>>> > > > >> > > >> state"
>>>> > > > >> > > >> > > > which
>>>> > > > >> > > >> > > > > > >>> in
>>>> > > > >> > > >> > > > > > >>>> my
>>>> > > > >> > > >> > > > > > >>>>>>> opinion is not
>>>> > > > >> > > >> > > > > > >>>>>>> necessary for local aggregation, at
>>>> least for
>>>> > > > >> start.
>>>> > > > >> > > >> > > > > > >>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>> Another issue I noticed is the local
>>>> key by
>>>> > > > >> operator
>>>> > > > >> > > >> cannot
>>>> > > > >> > > >> > > > > > >> change
>>>> > > > >> > > >> > > > > > >>>>>> element
>>>> > > > >> > > >> > > > > > >>>>>>> type, it will
>>>> > > > >> > > >> > > > > > >>>>>>> also restrict a lot of use cases
>>>> which can be
>>>> > > > >> > benefit
>>>> > > > >> > > >> from
>>>> > > > >> > > >> > > > local
>>>> > > > >> > > >> > > > > > >>>>>>> aggregation, like "average".
>>>> > > > >> > > >> > > > > > >>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>> We also did similar logic in SQL and
>>>> the only
>>>> > > > >> thing
>>>> > > > >> > > >> need to
>>>> > > > >> > > >> > > be
>>>> > > > >> > > >> > > > > > >> done
>>>> > > > >> > > >> > > > > > >>>> is
>>>> > > > >> > > >> > > > > > >>>>>>> introduce
>>>> > > > >> > > >> > > > > > >>>>>>> a stateless lightweight operator
>>>> which is
>>>> > > > >> *chained*
>>>> > > > >> > > >> before
>>>> > > > >> > > >> > > > > > >>> `keyby()`.
>>>> > > > >> > > >> > > > > > >>>>> The
>>>> > > > >> > > >> > > > > > >>>>>>> operator will flush all buffered
>>>> > > > >> > > >> > > > > > >>>>>>> elements during
>>>> > > > >> > > >> > `StreamOperator::prepareSnapshotPreBarrier()`
>>>> > > > >> > > >> > > > and
>>>> > > > >> > > >> > > > > > >>>> make
>>>> > > > >> > > >> > > > > > >>>>>>> himself stateless.
>>>> > > > >> > > >> > > > > > >>>>>>> By the way, in the earlier version we
>>>> also
>>>> > > did
>>>> > > > >> the
>>>> > > > >> > > >> similar
>>>> > > > >> > > >> > > > > > >> approach
>>>> > > > >> > > >> > > > > > >>>> by
>>>> > > > >> > > >> > > > > > >>>>>>> introducing a stateful
>>>> > > > >> > > >> > > > > > >>>>>>> local aggregation operator but it's
>>>> not
>>>> > > > >> performed as
>>>> > > > >> > > >> well
>>>> > > > >> > > >> > as
>>>> > > > >> > > >> > > > the
>>>> > > > >> > > >> > > > > > >>>> later
>>>> > > > >> > > >> > > > > > >>>>>> one,
>>>> > > > >> > > >> > > > > > >>>>>>> and also effect the barrie
>>>> > > > >> > > >> > > > > > >>>>>>> alignment time. The later one is
>>>> fairly
>>>> > > simple
>>>> > > > >> and
>>>> > > > >> > > more
>>>> > > > >> > > >> > > > > > >> efficient.
>>>> > > > >> > > >> > > > > > >>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>> I would highly suggest you to
>>>> consider to
>>>> > > have
>>>> > > > a
>>>> > > > >> > > >> stateless
>>>> > > > >> > > >> > > > > > >> approach
>>>> > > > >> > > >> > > > > > >>>> at
>>>> > > > >> > > >> > > > > > >>>>>> the
>>>> > > > >> > > >> > > > > > >>>>>>> first step.
>>>> > > > >> > > >> > > > > > >>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>> Best,
>>>> > > > >> > > >> > > > > > >>>>>>> Kurt
>>>> > > > >> > > >> > > > > > >>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>> On Mon, Jun 17, 2019 at 7:32 PM Jark
>>>> Wu <
>>>> > > > >> > > >> imj...@gmail.com <mailto:imj...@gmail.com>>
>>>> > > > >> > > >> > > > > > >> wrote:
>>>> > > > >> > > >> > > > > > >>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>> Hi Vino,
>>>> > > > >> > > >> > > > > > >>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>> Thanks for the proposal.
>>>> > > > >> > > >> > > > > > >>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>> Regarding to the
>>>> "input.keyBy(0).sum(1)" vs
>>>> > > > >> > > >> > > > > > >>>>>>>>
>>>> > > > >> > > >> >
>>>> "input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)",
>>>> > > > >> > > >> > > > > > >> have
>>>> > > > >> > > >> > > > > > >>>> you
>>>> > > > >> > > >> > > > > > >>>>>>> done
>>>> > > > >> > > >> > > > > > >>>>>>>> some benchmark?
>>>> > > > >> > > >> > > > > > >>>>>>>> Because I'm curious about how much
>>>> > > performance
>>>> > > > >> > > >> improvement
>>>> > > > >> > > >> > > can
>>>> > > > >> > > >> > > > > > >> we
>>>> > > > >> > > >> > > > > > >>>> get
>>>> > > > >> > > >> > > > > > >>>>>> by
>>>> > > > >> > > >> > > > > > >>>>>>>> using count window as the local
>>>> operator.
>>>> > > > >> > > >> > > > > > >>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>> Best,
>>>> > > > >> > > >> > > > > > >>>>>>>> Jark
>>>> > > > >> > > >> > > > > > >>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>> On Mon, 17 Jun 2019 at 17:48, vino
>>>> yang <
>>>> > > > >> > > >> > > > yanghua1...@gmail.com <mailto:
>>>> yanghua1...@gmail.com>
>>>> > > > >> > > >> > > > > > >>>
>>>> > > > >> > > >> > > > > > >>>>> wrote:
>>>> > > > >> > > >> > > > > > >>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>> Hi Hequn,
>>>> > > > >> > > >> > > > > > >>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>> Thanks for your reply.
>>>> > > > >> > > >> > > > > > >>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>> The purpose of localKeyBy API is to
>>>> > > provide a
>>>> > > > >> tool
>>>> > > > >> > > >> which
>>>> > > > >> > > >> > > can
>>>> > > > >> > > >> > > > > > >>> let
>>>> > > > >> > > >> > > > > > >>>>>> users
>>>> > > > >> > > >> > > > > > >>>>>>> do
>>>> > > > >> > > >> > > > > > >>>>>>>>> pre-aggregation in the local. The
>>>> behavior
>>>> > > of
>>>> > > > >> the
>>>> > > > >> > > >> > > > > > >>> pre-aggregation
>>>> > > > >> > > >> > > > > > >>>>> is
>>>> > > > >> > > >> > > > > > >>>>>>>>> similar to keyBy API.
>>>> > > > >> > > >> > > > > > >>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>> So the three cases are different, I
>>>> will
>>>> > > > >> describe
>>>> > > > >> > > them
>>>> > > > >> > > >> > one
>>>> > > > >> > > >> > > by
>>>> > > > >> > > >> > > > > > >>>> one:
>>>> > > > >> > > >> > > > > > >>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>> 1. input.keyBy(0).sum(1)
>>>> > > > >> > > >> > > > > > >>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>> *In this case, the result is
>>>> event-driven,
>>>> > > > each
>>>> > > > >> > > event
>>>> > > > >> > > >> can
>>>> > > > >> > > >> > > > > > >>> produce
>>>> > > > >> > > >> > > > > > >>>>> one
>>>> > > > >> > > >> > > > > > >>>>>>> sum
>>>> > > > >> > > >> > > > > > >>>>>>>>> aggregation result and it is the
>>>> latest one
>>>> > > > >> from
>>>> > > > >> > the
>>>> > > > >> > > >> > source
>>>> > > > >> > > >> > > > > > >>>> start.*
>>>> > > > >> > > >> > > > > > >>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>> 2.
>>>> > > input.localKeyBy(0).sum(1).keyBy(0).sum(1)
>>>> > > > >> > > >> > > > > > >>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>> *In this case, the semantic may
>>>> have a
>>>> > > > >> problem, it
>>>> > > > >> > > >> would
>>>> > > > >> > > >> > do
>>>> > > > >> > > >> > > > > > >> the
>>>> > > > >> > > >> > > > > > >>>>> local
>>>> > > > >> > > >> > > > > > >>>>>>> sum
>>>> > > > >> > > >> > > > > > >>>>>>>>> aggregation and will produce the
>>>> latest
>>>> > > > partial
>>>> > > > >> > > result
>>>> > > > >> > > >> > from
>>>> > > > >> > > >> > > > > > >> the
>>>> > > > >> > > >> > > > > > >>>>>> source
>>>> > > > >> > > >> > > > > > >>>>>>>>> start for every event. *
>>>> > > > >> > > >> > > > > > >>>>>>>>> *These latest partial results from
>>>> the same
>>>> > > > key
>>>> > > > >> > are
>>>> > > > >> > > >> > hashed
>>>> > > > >> > > >> > > to
>>>> > > > >> > > >> > > > > > >>> one
>>>> > > > >> > > >> > > > > > >>>>>> node
>>>> > > > >> > > >> > > > > > >>>>>>> to
>>>> > > > >> > > >> > > > > > >>>>>>>>> do the global sum aggregation.*
>>>> > > > >> > > >> > > > > > >>>>>>>>> *In the global aggregation, when it
>>>> > > received
>>>> > > > >> > > multiple
>>>> > > > >> > > >> > > partial
>>>> > > > >> > > >> > > > > > >>>>> results
>>>> > > > >> > > >> > > > > > >>>>>>>> (they
>>>> > > > >> > > >> > > > > > >>>>>>>>> are all calculated from the source
>>>> start)
>>>> > > and
>>>> > > > >> sum
>>>> > > > >> > > them
>>>> > > > >> > > >> > will
>>>> > > > >> > > >> > > > > > >> get
>>>> > > > >> > > >> > > > > > >>>> the
>>>> > > > >> > > >> > > > > > >>>>>>> wrong
>>>> > > > >> > > >> > > > > > >>>>>>>>> result.*
>>>> > > > >> > > >> > > > > > >>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>> 3.
>>>> > > > >> > > >> > >
>>>> input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)
>>>> > > > >> > > >> > > > > > >>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>> *In this case, it would just get a
>>>> partial
>>>> > > > >> > > aggregation
>>>> > > > >> > > >> > > result
>>>> > > > >> > > >> > > > > > >>> for
>>>> > > > >> > > >> > > > > > >>>>>> the 5
>>>> > > > >> > > >> > > > > > >>>>>>>>> records in the count window. The
>>>> partial
>>>> > > > >> > aggregation
>>>> > > > >> > > >> > > results
>>>> > > > >> > > >> > > > > > >>> from
>>>> > > > >> > > >> > > > > > >>>>> the
>>>> > > > >> > > >> > > > > > >>>>>>>> same
>>>> > > > >> > > >> > > > > > >>>>>>>>> key will be aggregated globally.*
>>>> > > > >> > > >> > > > > > >>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>> So the first case and the third
>>>> case can
>>>> > > get
>>>> > > > >> the
>>>> > > > >> > > >> *same*
>>>> > > > >> > > >> > > > > > >> result,
>>>> > > > >> > > >> > > > > > >>>> the
>>>> > > > >> > > >> > > > > > >>>>>>>>> difference is the output-style and
>>>> the
>>>> > > > latency.
>>>> > > > >> > > >> > > > > > >>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>> Generally speaking, the local key
>>>> API is
>>>> > > just
>>>> > > > >> an
>>>> > > > >> > > >> > > optimization
>>>> > > > >> > > >> > > > > > >>>> API.
>>>> > > > >> > > >> > > > > > >>>>> We
>>>> > > > >> > > >> > > > > > >>>>>>> do
>>>> > > > >> > > >> > > > > > >>>>>>>>> not limit the user's usage, but the
>>>> user
>>>> > > has
>>>> > > > to
>>>> > > > >> > > >> > understand
>>>> > > > >> > > >> > > > > > >> its
>>>> > > > >> > > >> > > > > > >>>>>>> semantics
>>>> > > > >> > > >> > > > > > >>>>>>>>> and use it correctly.
>>>> > > > >> > > >> > > > > > >>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>> Best,
>>>> > > > >> > > >> > > > > > >>>>>>>>> Vino
>>>> > > > >> > > >> > > > > > >>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>> Hequn Cheng <chenghe...@gmail.com
>>>> <mailto:chenghe...@gmail.com>>
>>>> > > > >> 于2019年6月17日周一
>>>> > > > >> > > >> > 下午4:18写道:
>>>> > > > >> > > >> > > > > > >>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>> Hi Vino,
>>>> > > > >> > > >> > > > > > >>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>> Thanks for the proposal, I think
>>>> it is a
>>>> > > > very
>>>> > > > >> > good
>>>> > > > >> > > >> > > feature!
>>>> > > > >> > > >> > > > > > >>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>> One thing I want to make sure is
>>>> the
>>>> > > > semantics
>>>> > > > >> > for
>>>> > > > >> > > >> the
>>>> > > > >> > > >> > > > > > >>>>>> `localKeyBy`.
>>>> > > > >> > > >> > > > > > >>>>>>>> From
>>>> > > > >> > > >> > > > > > >>>>>>>>>> the document, the `localKeyBy` API
>>>> returns
>>>> > > > an
>>>> > > > >> > > >> instance
>>>> > > > >> > > >> > of
>>>> > > > >> > > >> > > > > > >>>>>>> `KeyedStream`
>>>> > > > >> > > >> > > > > > >>>>>>>>>> which can also perform sum(), so
>>>> in this
>>>> > > > case,
>>>> > > > >> > > what's
>>>> > > > >> > > >> > the
>>>> > > > >> > > >> > > > > > >>>>> semantics
>>>> > > > >> > > >> > > > > > >>>>>>> for
>>>> > > > >> > > >> > > > > > >>>>>>>>>> `localKeyBy()`. For example, will
>>>> the
>>>> > > > >> following
>>>> > > > >> > > code
>>>> > > > >> > > >> > share
>>>> > > > >> > > >> > > > > > >>> the
>>>> > > > >> > > >> > > > > > >>>>> same
>>>> > > > >> > > >> > > > > > >>>>>>>>> result?
>>>> > > > >> > > >> > > > > > >>>>>>>>>> and what're the differences
>>>> between them?
>>>> > > > >> > > >> > > > > > >>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>> 1. input.keyBy(0).sum(1)
>>>> > > > >> > > >> > > > > > >>>>>>>>>> 2.
>>>> > > > input.localKeyBy(0).sum(1).keyBy(0).sum(1)
>>>> > > > >> > > >> > > > > > >>>>>>>>>> 3.
>>>> > > > >> > > >> > > > > > >>
>>>> > > > >> input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)
>>>> > > > >> > > >> > > > > > >>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>> Would also be great if we can add
>>>> this
>>>> > > into
>>>> > > > >> the
>>>> > > > >> > > >> > document.
>>>> > > > >> > > >> > > > > > >>> Thank
>>>> > > > >> > > >> > > > > > >>>>> you
>>>> > > > >> > > >> > > > > > >>>>>>>> very
>>>> > > > >> > > >> > > > > > >>>>>>>>>> much.
>>>> > > > >> > > >> > > > > > >>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>> Best, Hequn
>>>> > > > >> > > >> > > > > > >>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>> On Fri, Jun 14, 2019 at 11:34 AM
>>>> vino
>>>> > > yang <
>>>> > > > >> > > >> > > > > > >>>>> yanghua1...@gmail.com <mailto:
>>>> yanghua1...@gmail.com>>
>>>> > > > >> > > >> > > > > > >>>>>>>>> wrote:
>>>> > > > >> > > >> > > > > > >>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>>> Hi Aljoscha,
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>>> I have looked at the "*Process*"
>>>> section
>>>> > > of
>>>> > > > >> FLIP
>>>> > > > >> > > >> wiki
>>>> > > > >> > > >> > > > > > >>>> page.[1]
>>>> > > > >> > > >> > > > > > >>>>>> This
>>>> > > > >> > > >> > > > > > >>>>>>>>> mail
>>>> > > > >> > > >> > > > > > >>>>>>>>>>> thread indicates that it has
>>>> proceeded to
>>>> > > > the
>>>> > > > >> > > third
>>>> > > > >> > > >> > step.
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>>> When I looked at the fourth
>>>> step(vote
>>>> > > > step),
>>>> > > > >> I
>>>> > > > >> > > >> didn't
>>>> > > > >> > > >> > > > > > >> find
>>>> > > > >> > > >> > > > > > >>>> the
>>>> > > > >> > > >> > > > > > >>>>>>>>>>> prerequisites for starting the
>>>> voting
>>>> > > > >> process.
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>>> Considering that the discussion
>>>> of this
>>>> > > > >> feature
>>>> > > > >> > > has
>>>> > > > >> > > >> > been
>>>> > > > >> > > >> > > > > > >>> done
>>>> > > > >> > > >> > > > > > >>>>> in
>>>> > > > >> > > >> > > > > > >>>>>>> the
>>>> > > > >> > > >> > > > > > >>>>>>>>> old
>>>> > > > >> > > >> > > > > > >>>>>>>>>>> thread. [2] So can you tell me
>>>> when
>>>> > > should
>>>> > > > I
>>>> > > > >> > start
>>>> > > > >> > > >> > > > > > >> voting?
>>>> > > > >> > > >> > > > > > >>>> Can
>>>> > > > >> > > >> > > > > > >>>>> I
>>>> > > > >> > > >> > > > > > >>>>>>>> start
>>>> > > > >> > > >> > > > > > >>>>>>>>>> now?
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>>> Best,
>>>> > > > >> > > >> > > > > > >>>>>>>>>>> Vino
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>>> [1]:
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>
>>>> > > > >> > > >> > > > > > >>>>>
>>>> > > > >> > > >> > > > > > >>>>
>>>> > > > >> > > >> > > > > > >>>
>>>> > > > >> > > >> > > > > > >>
>>>> > > > >> > > >> > > > > >
>>>> > > > >> > > >> > > > >
>>>> > > > >> > > >> > > >
>>>> > > > >> > > >> > >
>>>> > > > >> > > >> >
>>>> > > > >> > > >>
>>>> > > > >> > >
>>>> > > > >> >
>>>> > > > >>
>>>> > > >
>>>> > >
>>>> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals#FlinkImprovementProposals-FLIPround-up
>>>> <
>>>> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals#FlinkImprovementProposals-FLIPround-up
>>>> >
>>>> > > > >> > > >> > > > > > >>>>>>>>>>> [2]:
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>
>>>> > > > >> > > >> > > > > > >>>>>
>>>> > > > >> > > >> > > > > > >>>>
>>>> > > > >> > > >> > > > > > >>>
>>>> > > > >> > > >> > > > > > >>
>>>> > > > >> > > >> > > > > >
>>>> > > > >> > > >> > > > >
>>>> > > > >> > > >> > > >
>>>> > > > >> > > >> > >
>>>> > > > >> > > >> >
>>>> > > > >> > > >>
>>>> > > > >> > >
>>>> > > > >> >
>>>> > > > >>
>>>> > > >
>>>> > >
>>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308
>>>> <
>>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308
>>>> >
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>>> leesf <leesf0...@gmail.com
>>>> <mailto:leesf0...@gmail.com>>
>>>> > > 于2019年6月13日周四
>>>> > > > >> > > 上午9:19写道:
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>> +1 for the FLIP, thank vino for
>>>> your
>>>> > > > >> efforts.
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>> Best,
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>> Leesf
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>> vino yang <yanghua1...@gmail.com
>>>> <mailto:yanghua1...@gmail.com>>
>>>> > > > >> > 于2019年6月12日周三
>>>> > > > >> > > >> > > > > > >>> 下午5:46写道:
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> Hi folks,
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> I would like to start the FLIP
>>>> > > discussion
>>>> > > > >> > thread
>>>> > > > >> > > >> > > > > > >> about
>>>> > > > >> > > >> > > > > > >>>>>>> supporting
>>>> > > > >> > > >> > > > > > >>>>>>>>>> local
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> aggregation in Flink.
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> In short, this feature can
>>>> effectively
>>>> > > > >> > alleviate
>>>> > > > >> > > >> data
>>>> > > > >> > > >> > > > > > >>>> skew.
>>>> > > > >> > > >> > > > > > >>>>>>> This
>>>> > > > >> > > >> > > > > > >>>>>>>> is
>>>> > > > >> > > >> > > > > > >>>>>>>>>> the
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> FLIP:
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>
>>>> > > > >> > > >> > > > > > >>>>>
>>>> > > > >> > > >> > > > > > >>>>
>>>> > > > >> > > >> > > > > > >>>
>>>> > > > >> > > >> > > > > > >>
>>>> > > > >> > > >> > > > > >
>>>> > > > >> > > >> > > > >
>>>> > > > >> > > >> > > >
>>>> > > > >> > > >> > >
>>>> > > > >> > > >> >
>>>> > > > >> > > >>
>>>> > > > >> > >
>>>> > > > >> >
>>>> > > > >>
>>>> > > >
>>>> > >
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-44%3A+Support+Local+Aggregation+in+Flink
>>>> <
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-44%3A+Support+Local+Aggregation+in+Flink
>>>> >
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> *Motivation* (copied from FLIP)
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> Currently, keyed streams are
>>>> widely
>>>> > > used
>>>> > > > to
>>>> > > > >> > > >> perform
>>>> > > > >> > > >> > > > > > >>>>>> aggregating
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>> operations
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> (e.g., reduce, sum and window)
>>>> on the
>>>> > > > >> elements
>>>> > > > >> > > >> that
>>>> > > > >> > > >> > > > > > >>> have
>>>> > > > >> > > >> > > > > > >>>>> the
>>>> > > > >> > > >> > > > > > >>>>>>> same
>>>> > > > >> > > >> > > > > > >>>>>>>>>> key.
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>> When
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> executed at runtime, the
>>>> elements with
>>>> > > > the
>>>> > > > >> > same
>>>> > > > >> > > >> key
>>>> > > > >> > > >> > > > > > >>> will
>>>> > > > >> > > >> > > > > > >>>> be
>>>> > > > >> > > >> > > > > > >>>>>>> sent
>>>> > > > >> > > >> > > > > > >>>>>>>> to
>>>> > > > >> > > >> > > > > > >>>>>>>>>> and
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> aggregated by the same task.
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> The performance of these
>>>> aggregating
>>>> > > > >> > operations
>>>> > > > >> > > is
>>>> > > > >> > > >> > > > > > >> very
>>>> > > > >> > > >> > > > > > >>>>>>> sensitive
>>>> > > > >> > > >> > > > > > >>>>>>>>> to
>>>> > > > >> > > >> > > > > > >>>>>>>>>>> the
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> distribution of keys. In the
>>>> cases
>>>> > > where
>>>> > > > >> the
>>>> > > > >> > > >> > > > > > >>> distribution
>>>> > > > >> > > >> > > > > > >>>>> of
>>>> > > > >> > > >> > > > > > >>>>>>> keys
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>> follows a
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> powerful law, the performance
>>>> will be
>>>> > > > >> > > >> significantly
>>>> > > > >> > > >> > > > > > >>>>>> downgraded.
>>>> > > > >> > > >> > > > > > >>>>>>>>> More
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> unluckily, increasing the
>>>> degree of
>>>> > > > >> > parallelism
>>>> > > > >> > > >> does
>>>> > > > >> > > >> > > > > > >>> not
>>>> > > > >> > > >> > > > > > >>>>> help
>>>> > > > >> > > >> > > > > > >>>>>>>> when
>>>> > > > >> > > >> > > > > > >>>>>>>>> a
>>>> > > > >> > > >> > > > > > >>>>>>>>>>> task
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> is overloaded by a single key.
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> Local aggregation is a
>>>> widely-adopted
>>>> > > > >> method
>>>> > > > >> > to
>>>> > > > >> > > >> > > > > > >> reduce
>>>> > > > >> > > >> > > > > > >>>> the
>>>> > > > >> > > >> > > > > > >>>>>>>>>> performance
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> degraded by data skew. We can
>>>> decompose
>>>> > > > the
>>>> > > > >> > > >> > > > > > >> aggregating
>>>> > > > >> > > >> > > > > > >>>>>>>> operations
>>>> > > > >> > > >> > > > > > >>>>>>>>>> into
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>> two
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> phases. In the first phase, we
>>>> > > aggregate
>>>> > > > >> the
>>>> > > > >> > > >> elements
>>>> > > > >> > > >> > > > > > >>> of
>>>> > > > >> > > >> > > > > > >>>>> the
>>>> > > > >> > > >> > > > > > >>>>>>> same
>>>> > > > >> > > >> > > > > > >>>>>>>>> key
>>>> > > > >> > > >> > > > > > >>>>>>>>>>> at
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> the sender side to obtain
>>>> partial
>>>> > > > results.
>>>> > > > >> > Then
>>>> > > > >> > > at
>>>> > > > >> > > >> > > > > > >> the
>>>> > > > >> > > >> > > > > > >>>>> second
>>>> > > > >> > > >> > > > > > >>>>>>>>> phase,
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>> these
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> partial results are sent to
>>>> receivers
>>>> > > > >> > according
>>>> > > > >> > > to
>>>> > > > >> > > >> > > > > > >>> their
>>>> > > > >> > > >> > > > > > >>>>> keys
>>>> > > > >> > > >> > > > > > >>>>>>> and
>>>> > > > >> > > >> > > > > > >>>>>>>>> are
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> combined to obtain the final
>>>> result.
>>>> > > > Since
>>>> > > > >> the
>>>> > > > >> > > >> number
>>>> > > > >> > > >> > > > > > >>> of
>>>> > > > >> > > >> > > > > > >>>>>>> partial
>>>> > > > >> > > >> > > > > > >>>>>>>>>>> results
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> received by each receiver is
>>>> limited by
>>>> > > > the
>>>> > > > >> > > >> number of
>>>> > > > >> > > >> > > > > > >>>>>> senders,
>>>> > > > >> > > >> > > > > > >>>>>>>> the
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> imbalance among receivers can be
>>>> > > reduced.
>>>> > > > >> > > >> Besides, by
>>>> > > > >> > > >> > > > > > >>>>>> reducing
>>>> > > > >> > > >> > > > > > >>>>>>>> the
>>>> > > > >> > > >> > > > > > >>>>>>>>>>> amount
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> of transferred data the
>>>> performance can
>>>> > > > be
>>>> > > > >> > > further
>>>> > > > >> > > >> > > > > > >>>>> improved.
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> *More details*:
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> Design documentation:
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>
>>>> > > > >> > > >> > > > > > >>>>>
>>>> > > > >> > > >> > > > > > >>>>
>>>> > > > >> > > >> > > > > > >>>
>>>> > > > >> > > >> > > > > > >>
>>>> > > > >> > > >> > > > > >
>>>> > > > >> > > >> > > > >
>>>> > > > >> > > >> > > >
>>>> > > > >> > > >> > >
>>>> > > > >> > > >> >
>>>> > > > >> > > >>
>>>> > > > >> > >
>>>> > > > >> >
>>>> > > > >>
>>>> > > >
>>>> > >
>>>> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing
>>>> <
>>>> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing
>>>> >
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> Old discussion thread:
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>
>>>> > > > >> > > >> > > > > > >>>>>
>>>> > > > >> > > >> > > > > > >>>>
>>>> > > > >> > > >> > > > > > >>>
>>>> > > > >> > > >> > > > > > >>
>>>> > > > >> > > >> > > > > >
>>>> > > > >> > > >> > > > >
>>>> > > > >> > > >> > > >
>>>> > > > >> > > >> > >
>>>> > > > >> > > >> >
>>>> > > > >> > > >>
>>>> > > > >> > >
>>>> > > > >> >
>>>> > > > >>
>>>> > > >
>>>> > >
>>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308
>>>> <
>>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308
>>>> >
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> JIRA: FLINK-12786 <
>>>> > > > >> > > >> > > > > > >>>>>>>>>
>>>> > > > >> https://issues.apache.org/jira/browse/FLINK-12786 <
>>>> https://issues.apache.org/jira/browse/FLINK-12786>
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> We are looking forwards to your
>>>> > > feedback!
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> Best,
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>> Vino
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>>
>>>> > > > >> > > >> > > > > > >>>>>>
>>>> > > > >> > > >> > > > > > >>>>>
>>>> > > > >> > > >> > > > > > >>>>
>>>> > > > >> > > >> > > > > > >>>
>>>> > > > >> > > >> > > > > > >>
>>>> > > > >> > > >> > > > > >
>>>> > > > >> > > >> > > > > >
>>>> > > > >> > > >> > > > >
>>>> > > > >> > > >> > > >
>>>> > > > >> > > >> > >
>>>> > > > >> > > >> >
>>>> > > > >> > > >>
>>>> > > > >> > > >
>>>> > > > >> > >
>>>> > > > >> >
>>>> > > > >>
>>>> > > > >
>>>> > > >
>>>> > >
>>>>
>>>>

Reply via email to