Hi vino,

Sorry I don't see the consensus about reusing window operator and keep the
API design of localKeyBy. But I think we should definitely more thoughts
about this topic.

I also try to loop in Stephan for this discussion.

Best,
Kurt


On Mon, Jun 24, 2019 at 3:26 PM vino yang <yanghua1...@gmail.com> wrote:

> Hi all,
>
> I am happy we have a wonderful discussion and received many valuable
> opinions in the last few days.
>
> Now, let me try to summarize what we have reached consensus about the
> changes in the design.
>
>    - provide a unified abstraction to support two kinds of implementation;
>    - reuse WindowOperator and try to enhance it so that we can make the
>    intermediate result of the local aggregation can be buffered and
> flushed to
>    support two kinds of implementation;
>    - keep the API design of localKeyBy, but declare the disabled some APIs
>    we cannot support currently, and provide a configurable API for users to
>    choose how to handle intermediate result;
>
> The above three points have been updated in the design doc. Any
> questions, please let me know.
>
> @Aljoscha Krettek <aljos...@apache.org> What do you think? Any further
> comments?
>
> Best,
> Vino
>
> vino yang <yanghua1...@gmail.com> 于2019年6月20日周四 下午2:02写道:
>
> > Hi Kurt,
> >
> > Thanks for your comments.
> >
> > It seems we come to a consensus that we should alleviate the performance
> > degraded by data skew with local aggregation. In this FLIP, our key
> > solution is to introduce local keyed partition to achieve this goal.
> >
> > I also agree that we can benefit a lot from the usage of
> > AggregateFunction. In combination with localKeyBy, We can easily use it
> to
> > achieve local aggregation:
> >
> >    - input.localKeyBy(0).aggregate()
> >    - input.localKeyBy(0).window().aggregate()
> >
> >
> > I think the only problem here is the choices between
> >
> >    - (1) Introducing a new primitive called localKeyBy and implement
> >    local aggregation with existing operators, or
> >    - (2) Introducing an operator called localAggregation which is
> >    composed of a key selector, a window-like operator, and an aggregate
> >    function.
> >
> >
> > There may exist some optimization opportunities by providing a composited
> > interface for local aggregation. But at the same time, in my opinion, we
> > lose flexibility (Or we need certain efforts to achieve the same
> > flexibility).
> >
> > As said in the previous mails, we have many use cases where the
> > aggregation is very complicated and cannot be performed with
> > AggregateFunction. For example, users may perform windowed aggregations
> > according to time, data values, or even external storage. Typically, they
> > now use KeyedProcessFunction or customized triggers to implement these
> > aggregations. It's not easy to address data skew in such cases with a
> > composited interface for local aggregation.
> >
> > Given that Data Stream API is exactly targeted at these cases where the
> > application logic is very complicated and optimization does not matter, I
> > think it's a better choice to provide a relatively low-level and
> canonical
> > interface.
> >
> > The composited interface, on the other side, may be a good choice in
> > declarative interfaces, including SQL and Table API, as it allows more
> > optimization opportunities.
> >
> > Best,
> > Vino
> >
> >
> > Kurt Young <ykt...@gmail.com> 于2019年6月20日周四 上午10:15写道:
> >
> >> Hi all,
> >>
> >> As vino said in previous emails, I think we should first discuss and
> >> decide
> >> what kind of use cases this FLIP want to
> >> resolve, and what the API should look like. From my side, I think this
> is
> >> probably the root cause of current divergence.
> >>
> >> My understand is (from the FLIP title and motivation section of the
> >> document), we want to have a proper support of
> >> local aggregation, or pre aggregation. This is not a very new idea, most
> >> SQL engine already did this improvement. And
> >> the core concept about this is, there should be an AggregateFunction, no
> >> matter it's a Flink runtime's AggregateFunction or
> >> SQL's UserDefinedAggregateFunction. Both aggregation have concept of
> >> intermediate data type, sometimes we call it ACC.
> >> I quickly went through the POC piotr did before [1], it also directly
> uses
> >> AggregateFunction.
> >>
> >> But the thing is, after reading the design of this FLIP, I can't help
> >> myself feeling that this FLIP is not targeting to have a proper
> >> local aggregation support. It actually want to introduce another
> concept:
> >> LocalKeyBy, and how to split and merge local key groups,
> >> and how to properly support state on local key. Local aggregation just
> >> happened to be one possible use case of LocalKeyBy.
> >> But it lacks supporting the essential concept of local aggregation,
> which
> >> is intermediate data type. Without this, I really don't thing
> >> it is a good fit of local aggregation.
> >>
> >> Here I want to make sure of the scope or the goal about this FLIP, do we
> >> want to have a proper local aggregation engine, or we
> >> just want to introduce a new concept called LocalKeyBy?
> >>
> >> [1]: https://github.com/apache/flink/pull/4626
> >>
> >> Best,
> >> Kurt
> >>
> >>
> >> On Wed, Jun 19, 2019 at 5:13 PM vino yang <yanghua1...@gmail.com>
> wrote:
> >>
> >> > Hi Hequn,
> >> >
> >> > Thanks for your comments!
> >> >
> >> > I agree that allowing local aggregation reusing window API and
> refining
> >> > window operator to make it match both requirements (come from our and
> >> Kurt)
> >> > is a good decision!
> >> >
> >> > Concerning your questions:
> >> >
> >> > 1. The result of input.localKeyBy(0).sum(1).keyBy(0).sum(1) may be
> >> > meaningless.
> >> >
> >> > Yes, it does not make sense in most cases. However, I also want to
> note
> >> > users should know the right semantics of localKeyBy and use it
> >> correctly.
> >> > Because this issue also exists for the global keyBy, consider this
> >> example:
> >> > input.keyBy(0).sum(1).keyBy(0).sum(1), the result is also meaningless.
> >> >
> >> > 2. About the semantics of
> >> > input1.localKeyBy(0).intervalJoin(input2.localKeyBy(1)).
> >> >
> >> > Good catch! I agree with you that it's not good to enable all
> >> > functionalities for localKeyBy from KeyedStream.
> >> > Currently, We do not support some APIs such as
> >> > connect/join/intervalJoin/coGroup. This is due to that we force the
> >> > operators on LocalKeyedStreams chained with the inputs.
> >> >
> >> > Best,
> >> > Vino
> >> >
> >> >
> >> > Hequn Cheng <chenghe...@gmail.com> 于2019年6月19日周三 下午3:42写道:
> >> >
> >> > > Hi,
> >> > >
> >> > > Thanks a lot for your great discussion and great to see that some
> >> > agreement
> >> > > has been reached on the "local aggregate engine"!
> >> > >
> >> > > ===> Considering the abstract engine,
> >> > > I'm thinking is it valuable for us to extend the current window to
> >> meet
> >> > > both demands raised by Kurt and Vino? There are some benefits we can
> >> get:
> >> > >
> >> > > 1. The interfaces of the window are complete and clear. With
> windows,
> >> we
> >> > > can define a lot of ways to split the data and perform different
> >> > > computations.
> >> > > 2. We can also leverage the window to do miniBatch for the global
> >> > > aggregation, i.e, we can use the window to bundle data belong to the
> >> same
> >> > > key, for every bundle we only need to read and write once state.
> This
> >> can
> >> > > greatly reduce state IO and improve performance.
> >> > > 3. A lot of other use cases can also benefit from the window base on
> >> > memory
> >> > > or stateless.
> >> > >
> >> > > ===> As for the API,
> >> > > I think it is good to make our API more flexible. However, we may
> >> need to
> >> > > make our API meaningful.
> >> > >
> >> > > Take my previous reply as an example,
> >> > > input.localKeyBy(0).sum(1).keyBy(0).sum(1). The result may be
> >> > meaningless.
> >> > > Another example I find is the intervalJoin, e.g.,
> >> > > input1.localKeyBy(0).intervalJoin(input2.localKeyBy(1)). In this
> >> case, it
> >> > > will bring problems if input1 and input2 share different
> parallelism.
> >> We
> >> > > don't know which input should the join chained with? Even if they
> >> share
> >> > the
> >> > > same parallelism, it's hard to tell what the join is doing. There
> are
> >> > maybe
> >> > > some other problems.
> >> > >
> >> > > From this point of view, it's at least not good to enable all
> >> > > functionalities for localKeyBy from KeyedStream?
> >> > >
> >> > > Great to also have your opinions.
> >> > >
> >> > > Best, Hequn
> >> > >
> >> > >
> >> > >
> >> > >
> >> > > On Wed, Jun 19, 2019 at 10:24 AM vino yang <yanghua1...@gmail.com>
> >> > wrote:
> >> > >
> >> > > > Hi Kurt and Piotrek,
> >> > > >
> >> > > > Thanks for your comments.
> >> > > >
> >> > > > I agree that we can provide a better abstraction to be compatible
> >> with
> >> > > two
> >> > > > different implementations.
> >> > > >
> >> > > > First of all, I think we should consider what kind of scenarios we
> >> need
> >> > > to
> >> > > > support in *API* level?
> >> > > >
> >> > > > We have some use cases which need to a customized aggregation
> >> through
> >> > > > KeyedProcessFunction, (in the usage of our localKeyBy.window they
> >> can
> >> > use
> >> > > > ProcessWindowFunction).
> >> > > >
> >> > > > Shall we support these flexible use scenarios?
> >> > > >
> >> > > > Best,
> >> > > > Vino
> >> > > >
> >> > > > Kurt Young <ykt...@gmail.com> 于2019年6月18日周二 下午8:37写道:
> >> > > >
> >> > > > > Hi Piotr,
> >> > > > >
> >> > > > > Thanks for joining the discussion. Make “local aggregation"
> >> abstract
> >> > > > enough
> >> > > > > sounds good to me, we could
> >> > > > > implement and verify alternative solutions for use cases of
> local
> >> > > > > aggregation. Maybe we will find both solutions
> >> > > > > are appropriate for different scenarios.
> >> > > > >
> >> > > > > Starting from a simple one sounds a practical way to go. What do
> >> you
> >> > > > think,
> >> > > > > vino?
> >> > > > >
> >> > > > > Best,
> >> > > > > Kurt
> >> > > > >
> >> > > > >
> >> > > > > On Tue, Jun 18, 2019 at 8:10 PM Piotr Nowojski <
> >> pi...@ververica.com>
> >> > > > > wrote:
> >> > > > >
> >> > > > > > Hi Kurt and Vino,
> >> > > > > >
> >> > > > > > I think there is a trade of hat we need to consider for the
> >> local
> >> > > > > > aggregation.
> >> > > > > >
> >> > > > > > Generally speaking I would agree with Kurt about local
> >> > > aggregation/pre
> >> > > > > > aggregation not using Flink's state flush the operator on a
> >> > > checkpoint.
> >> > > > > > Network IO is usually cheaper compared to Disks IO. This has
> >> > however
> >> > > > > couple
> >> > > > > > of issues:
> >> > > > > > 1. It can explode number of in-flight records during
> checkpoint
> >> > > barrier
> >> > > > > > alignment, making checkpointing slower and decrease the actual
> >> > > > > throughput.
> >> > > > > > 2. This trades Disks IO on the local aggregation machine with
> >> CPU
> >> > > (and
> >> > > > > > Disks IO in case of RocksDB) on the final aggregation machine.
> >> This
> >> > > is
> >> > > > > > fine, as long there is no huge data skew. If there is only a
> >> > handful
> >> > > > (or
> >> > > > > > even one single) hot keys, it might be better to keep the
> >> > persistent
> >> > > > > state
> >> > > > > > in the LocalAggregationOperator to offload final aggregation
> as
> >> > much
> >> > > as
> >> > > > > > possible.
> >> > > > > > 3. With frequent checkpointing local aggregation effectiveness
> >> > would
> >> > > > > > degrade.
> >> > > > > >
> >> > > > > > I assume Kurt is correct, that in your use cases stateless
> >> operator
> >> > > was
> >> > > > > > behaving better, but I could easily see other use cases as
> well.
> >> > For
> >> > > > > > example someone is already using RocksDB, and his job is
> >> > bottlenecked
> >> > > > on
> >> > > > > a
> >> > > > > > single window operator instance because of the data skew. In
> >> that
> >> > > case
> >> > > > > > stateful local aggregation would be probably a better choice.
> >> > > > > >
> >> > > > > > Because of that, I think we should eventually provide both
> >> versions
> >> > > and
> >> > > > > in
> >> > > > > > the initial version we should at least make the “local
> >> aggregation
> >> > > > > engine”
> >> > > > > > abstract enough, that one could easily provide different
> >> > > implementation
> >> > > > > > strategy.
> >> > > > > >
> >> > > > > > Piotrek
> >> > > > > >
> >> > > > > > > On 18 Jun 2019, at 11:46, Kurt Young <ykt...@gmail.com>
> >> wrote:
> >> > > > > > >
> >> > > > > > > Hi,
> >> > > > > > >
> >> > > > > > > For the trigger, it depends on what operator we want to use
> >> under
> >> > > the
> >> > > > > > API.
> >> > > > > > > If we choose to use window operator,
> >> > > > > > > we should also use window's trigger. However, I also think
> >> reuse
> >> > > > window
> >> > > > > > > operator for this scenario may not be
> >> > > > > > > the best choice. The reasons are the following:
> >> > > > > > >
> >> > > > > > > 1. As a lot of people already pointed out, window relies
> >> heavily
> >> > on
> >> > > > > state
> >> > > > > > > and it will definitely effect performance. You can
> >> > > > > > > argue that one can use heap based statebackend, but this
> will
> >> > > > introduce
> >> > > > > > > extra coupling. Especially we have a chance to
> >> > > > > > > design a pure stateless operator.
> >> > > > > > > 2. The window operator is *the most* complicated operator
> >> Flink
> >> > > > > currently
> >> > > > > > > have. Maybe we only need to pick a subset of
> >> > > > > > > window operator to achieve the goal, but once the user wants
> >> to
> >> > > have
> >> > > > a
> >> > > > > > deep
> >> > > > > > > look at the localAggregation operator, it's still
> >> > > > > > > hard to find out what's going on under the window operator.
> >> For
> >> > > > > > simplicity,
> >> > > > > > > I would also recommend we introduce a dedicated
> >> > > > > > > lightweight operator, which also much easier for a user to
> >> learn
> >> > > and
> >> > > > > use.
> >> > > > > > >
> >> > > > > > > For your question about increasing the burden in
> >> > > > > > > `StreamOperator::prepareSnapshotPreBarrier()`, the only
> thing
> >> > this
> >> > > > > > function
> >> > > > > > > need
> >> > > > > > > to do is output all the partial results, it's purely cpu
> >> > workload,
> >> > > > not
> >> > > > > > > introducing any IO. I want to point out that even if we have
> >> this
> >> > > > > > > cost, we reduced another barrier align cost of the operator,
> >> > which
> >> > > is
> >> > > > > the
> >> > > > > > > sync flush stage of the state, if you introduced state. This
> >> > > > > > > flush actually will introduce disk IO, and I think it's
> >> worthy to
> >> > > > > > exchange
> >> > > > > > > this cost with purely CPU workload. And we do have some
> >> > > > > > > observations about these two behavior (as i said before, we
> >> > > actually
> >> > > > > > > implemented both solutions), the stateless one actually
> >> performs
> >> > > > > > > better both in performance and barrier align time.
> >> > > > > > >
> >> > > > > > > Best,
> >> > > > > > > Kurt
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > On Tue, Jun 18, 2019 at 3:40 PM vino yang <
> >> yanghua1...@gmail.com
> >> > >
> >> > > > > wrote:
> >> > > > > > >
> >> > > > > > >> Hi Kurt,
> >> > > > > > >>
> >> > > > > > >> Thanks for your example. Now, it looks more clearly for me.
> >> > > > > > >>
> >> > > > > > >> From your example code snippet, I saw the localAggregate
> API
> >> has
> >> > > > three
> >> > > > > > >> parameters:
> >> > > > > > >>
> >> > > > > > >>   1. key field
> >> > > > > > >>   2. PartitionAvg
> >> > > > > > >>   3. CountTrigger: Does this trigger comes from window
> >> package?
> >> > > > > > >>
> >> > > > > > >> I will compare our and your design from API and operator
> >> level:
> >> > > > > > >>
> >> > > > > > >> *From the API level:*
> >> > > > > > >>
> >> > > > > > >> As I replied to @dianfu in the old email thread,[1] the
> >> Window
> >> > API
> >> > > > can
> >> > > > > > >> provide the second and the third parameter right now.
> >> > > > > > >>
> >> > > > > > >> If you reuse specified interface or class, such as
> *Trigger*
> >> or
> >> > > > > > >> *CounterTrigger* provided by window package, but do not use
> >> > window
> >> > > > > API,
> >> > > > > > >> it's not reasonable.
> >> > > > > > >> And if you do not reuse these interface or class, you would
> >> need
> >> > > to
> >> > > > > > >> introduce more things however they are looked similar to
> the
> >> > > things
> >> > > > > > >> provided by window package.
> >> > > > > > >>
> >> > > > > > >> The window package has provided several types of the window
> >> and
> >> > > many
> >> > > > > > >> triggers and let users customize it. What's more, the user
> is
> >> > more
> >> > > > > > familiar
> >> > > > > > >> with Window API.
> >> > > > > > >>
> >> > > > > > >> This is the reason why we just provide localKeyBy API and
> >> reuse
> >> > > the
> >> > > > > > window
> >> > > > > > >> API. It reduces unnecessary components such as triggers and
> >> the
> >> > > > > > mechanism
> >> > > > > > >> of buffer (based on count num or time).
> >> > > > > > >> And it has a clear and easy to understand semantics.
> >> > > > > > >>
> >> > > > > > >> *From the operator level:*
> >> > > > > > >>
> >> > > > > > >> We reused window operator, so we can get all the benefits
> >> from
> >> > > state
> >> > > > > and
> >> > > > > > >> checkpoint.
> >> > > > > > >>
> >> > > > > > >> From your design, you named the operator under
> localAggregate
> >> > API
> >> > > > is a
> >> > > > > > >> *stateless* operator. IMO, it is still a state, it is just
> >> not
> >> > > Flink
> >> > > > > > >> managed state.
> >> > > > > > >> About the memory buffer (I think it's still not very clear,
> >> if
> >> > you
> >> > > > > have
> >> > > > > > >> time, can you give more detail information or answer my
> >> > > questions),
> >> > > > I
> >> > > > > > have
> >> > > > > > >> some questions:
> >> > > > > > >>
> >> > > > > > >>   - if it just a raw JVM heap memory buffer, how to support
> >> > fault
> >> > > > > > >>   tolerance, if the job is configured EXACTLY-ONCE semantic
> >> > > > guarantee?
> >> > > > > > >>   - if you thought the memory buffer(non-Flink state), has
> >> > better
> >> > > > > > >>   performance. In our design, users can also config HEAP
> >> state
> >> > > > backend
> >> > > > > > to
> >> > > > > > >>   provide the performance close to your mechanism.
> >> > > > > > >>   - `StreamOperator::prepareSnapshotPreBarrier()` related
> to
> >> the
> >> > > > > timing
> >> > > > > > of
> >> > > > > > >>   snapshot. IMO, the flush action should be a synchronized
> >> > action?
> >> > > > (if
> >> > > > > > >> not,
> >> > > > > > >>   please point out my mistake) I still think we should not
> >> > depend
> >> > > on
> >> > > > > the
> >> > > > > > >>   timing of checkpoint. Checkpoint related operations are
> >> > inherent
> >> > > > > > >>   performance sensitive, we should not increase its burden
> >> > > anymore.
> >> > > > > Our
> >> > > > > > >>   implementation based on the mechanism of Flink's
> >> checkpoint,
> >> > > which
> >> > > > > can
> >> > > > > > >>   benefit from the asnyc snapshot and incremental
> checkpoint.
> >> > IMO,
> >> > > > the
> >> > > > > > >>   performance is not a problem, and we also do not find the
> >> > > > > performance
> >> > > > > > >> issue
> >> > > > > > >>   in our production.
> >> > > > > > >>
> >> > > > > > >> [1]:
> >> > > > > > >>
> >> > > > > > >>
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308
> >> > > > > > >>
> >> > > > > > >> Best,
> >> > > > > > >> Vino
> >> > > > > > >>
> >> > > > > > >> Kurt Young <ykt...@gmail.com> 于2019年6月18日周二 下午2:27写道:
> >> > > > > > >>
> >> > > > > > >>> Yeah, sorry for not expressing myself clearly. I will try
> to
> >> > > > provide
> >> > > > > > more
> >> > > > > > >>> details to make sure we are on the same page.
> >> > > > > > >>>
> >> > > > > > >>> For DataStream API, it shouldn't be optimized
> automatically.
> >> > You
> >> > > > have
> >> > > > > > to
> >> > > > > > >>> explicitly call API to do local aggregation
> >> > > > > > >>> as well as the trigger policy of the local aggregation.
> Take
> >> > > > average
> >> > > > > > for
> >> > > > > > >>> example, the user program may look like this (just a
> draft):
> >> > > > > > >>>
> >> > > > > > >>> assuming the input type is DataStream<Tupl2<String, Int>>
> >> > > > > > >>>
> >> > > > > > >>> ds.localAggregate(
> >> > > > > > >>>        0,                                       // The
> local
> >> > key,
> >> > > > > which
> >> > > > > > >> is
> >> > > > > > >>> the String from Tuple2
> >> > > > > > >>>        PartitionAvg(1),                 // The partial
> >> > > aggregation
> >> > > > > > >>> function, produces Tuple2<Long, Int>, indicating sum and
> >> count
> >> > > > > > >>>        CountTrigger.of(1000L)    // Trigger policy, note
> >> this
> >> > > > should
> >> > > > > be
> >> > > > > > >>> best effort, and also be composited with time based or
> >> memory
> >> > > size
> >> > > > > > based
> >> > > > > > >>> trigger
> >> > > > > > >>>    )                                           // The
> return
> >> > type
> >> > > > is
> >> > > > > > >> local
> >> > > > > > >>> aggregate Tuple2<String, Tupl2<Long, Int>>
> >> > > > > > >>>    .keyBy(0)                             // Further keyby
> it
> >> > with
> >> > > > > > >> required
> >> > > > > > >>> key
> >> > > > > > >>>    .aggregate(1)                      // This will merge
> all
> >> > the
> >> > > > > > partial
> >> > > > > > >>> results and get the final average.
> >> > > > > > >>>
> >> > > > > > >>> (This is only a draft, only trying to explain what it
> looks
> >> > > like. )
> >> > > > > > >>>
> >> > > > > > >>> The local aggregate operator can be stateless, we can
> keep a
> >> > > memory
> >> > > > > > >> buffer
> >> > > > > > >>> or other efficient data structure to improve the aggregate
> >> > > > > performance.
> >> > > > > > >>>
> >> > > > > > >>> Let me know if you have any other questions.
> >> > > > > > >>>
> >> > > > > > >>> Best,
> >> > > > > > >>> Kurt
> >> > > > > > >>>
> >> > > > > > >>>
> >> > > > > > >>> On Tue, Jun 18, 2019 at 1:29 PM vino yang <
> >> > yanghua1...@gmail.com
> >> > > >
> >> > > > > > wrote:
> >> > > > > > >>>
> >> > > > > > >>>> Hi Kurt,
> >> > > > > > >>>>
> >> > > > > > >>>> Thanks for your reply.
> >> > > > > > >>>>
> >> > > > > > >>>> Actually, I am not against you to raise your design.
> >> > > > > > >>>>
> >> > > > > > >>>> From your description before, I just can imagine your
> >> > high-level
> >> > > > > > >>>> implementation is about SQL and the optimization is inner
> >> of
> >> > the
> >> > > > > API.
> >> > > > > > >> Is
> >> > > > > > >>> it
> >> > > > > > >>>> automatically? how to give the configuration option about
> >> > > trigger
> >> > > > > > >>>> pre-aggregation?
> >> > > > > > >>>>
> >> > > > > > >>>> Maybe after I get more information, it sounds more
> >> reasonable.
> >> > > > > > >>>>
> >> > > > > > >>>> IMO, first of all, it would be better to make your user
> >> > > interface
> >> > > > > > >>> concrete,
> >> > > > > > >>>> it's the basis of the discussion.
> >> > > > > > >>>>
> >> > > > > > >>>> For example, can you give an example code snippet to
> >> introduce
> >> > > how
> >> > > > > to
> >> > > > > > >>> help
> >> > > > > > >>>> users to process data skew caused by the jobs which built
> >> with
> >> > > > > > >> DataStream
> >> > > > > > >>>> API?
> >> > > > > > >>>>
> >> > > > > > >>>> If you give more details we can discuss further more. I
> >> think
> >> > if
> >> > > > one
> >> > > > > > >>> design
> >> > > > > > >>>> introduces an exact interface and another does not.
> >> > > > > > >>>>
> >> > > > > > >>>> The implementation has an obvious difference. For
> example,
> >> we
> >> > > > > > introduce
> >> > > > > > >>> an
> >> > > > > > >>>> exact API in DataStream named localKeyBy, about the
> >> > > > pre-aggregation
> >> > > > > we
> >> > > > > > >>> need
> >> > > > > > >>>> to define the trigger mechanism of local aggregation, so
> we
> >> > find
> >> > > > > > reused
> >> > > > > > >>>> window API and operator is a good choice. This is a
> >> reasoning
> >> > > link
> >> > > > > > from
> >> > > > > > >>>> design to implementation.
> >> > > > > > >>>>
> >> > > > > > >>>> What do you think?
> >> > > > > > >>>>
> >> > > > > > >>>> Best,
> >> > > > > > >>>> Vino
> >> > > > > > >>>>
> >> > > > > > >>>>
> >> > > > > > >>>> Kurt Young <ykt...@gmail.com> 于2019年6月18日周二 上午11:58写道:
> >> > > > > > >>>>
> >> > > > > > >>>>> Hi Vino,
> >> > > > > > >>>>>
> >> > > > > > >>>>> Now I feel that we may have different understandings
> about
> >> > what
> >> > > > > kind
> >> > > > > > >> of
> >> > > > > > >>>>> problems or improvements you want to
> >> > > > > > >>>>> resolve. Currently, most of the feedback are focusing on
> >> *how
> >> > > to
> >> > > > > do a
> >> > > > > > >>>>> proper local aggregation to improve performance
> >> > > > > > >>>>> and maybe solving the data skew issue*. And my gut
> >> feeling is
> >> > > > this
> >> > > > > is
> >> > > > > > >>>>> exactly what users want at the first place,
> >> > > > > > >>>>> especially those +1s. (Sorry to try to summarize here,
> >> please
> >> > > > > correct
> >> > > > > > >>> me
> >> > > > > > >>>> if
> >> > > > > > >>>>> i'm wrong).
> >> > > > > > >>>>>
> >> > > > > > >>>>> But I still think the design is somehow diverged from
> the
> >> > goal.
> >> > > > If
> >> > > > > we
> >> > > > > > >>>> want
> >> > > > > > >>>>> to have an efficient and powerful way to
> >> > > > > > >>>>> have local aggregation, supporting intermedia result
> type
> >> is
> >> > > > > > >> essential
> >> > > > > > >>>> IMO.
> >> > > > > > >>>>> Both runtime's `AggregateFunction` and
> >> > > > > > >>>>> SQL`s `UserDefinedAggregateFunction` have a proper
> >> support of
> >> > > > > > >>>> intermediate
> >> > > > > > >>>>> result type and can do `merge` operation
> >> > > > > > >>>>> on them.
> >> > > > > > >>>>>
> >> > > > > > >>>>> Now, we have a lightweight alternatives which performs
> >> well,
> >> > > and
> >> > > > > > >> have a
> >> > > > > > >>>>> nice fit with the local aggregate requirements.
> >> > > > > > >>>>> Mostly importantly,  it's much less complex because it's
> >> > > > stateless.
> >> > > > > > >> And
> >> > > > > > >>>> it
> >> > > > > > >>>>> can also achieve the similar multiple-aggregation
> >> > > > > > >>>>> scenario.
> >> > > > > > >>>>>
> >> > > > > > >>>>> I still not convinced why we shouldn't consider it as a
> >> first
> >> > > > step.
> >> > > > > > >>>>>
> >> > > > > > >>>>> Best,
> >> > > > > > >>>>> Kurt
> >> > > > > > >>>>>
> >> > > > > > >>>>>
> >> > > > > > >>>>> On Tue, Jun 18, 2019 at 11:35 AM vino yang <
> >> > > > yanghua1...@gmail.com>
> >> > > > > > >>>> wrote:
> >> > > > > > >>>>>
> >> > > > > > >>>>>> Hi Kurt,
> >> > > > > > >>>>>>
> >> > > > > > >>>>>> Thanks for your comments.
> >> > > > > > >>>>>>
> >> > > > > > >>>>>> It seems we both implemented local aggregation feature
> to
> >> > > > optimize
> >> > > > > > >>> the
> >> > > > > > >>>>>> issue of data skew.
> >> > > > > > >>>>>> However, IMHO, the API level of optimizing revenue is
> >> > > different.
> >> > > > > > >>>>>>
> >> > > > > > >>>>>> *Your optimization benefits from Flink SQL and it's not
> >> > user's
> >> > > > > > >>>> faces.(If
> >> > > > > > >>>>> I
> >> > > > > > >>>>>> understand it incorrectly, please correct this.)*
> >> > > > > > >>>>>> *Our implementation employs it as an optimization tool
> >> API
> >> > for
> >> > > > > > >>>>> DataStream,
> >> > > > > > >>>>>> it just like a local version of the keyBy API.*
> >> > > > > > >>>>>>
> >> > > > > > >>>>>> Based on this, I want to say support it as a DataStream
> >> API
> >> > > can
> >> > > > > > >>> provide
> >> > > > > > >>>>>> these advantages:
> >> > > > > > >>>>>>
> >> > > > > > >>>>>>
> >> > > > > > >>>>>>   - The localKeyBy API has a clear semantic and it's
> >> > flexible
> >> > > > not
> >> > > > > > >>> only
> >> > > > > > >>>>> for
> >> > > > > > >>>>>>   processing data skew but also for implementing some
> >> user
> >> > > > cases,
> >> > > > > > >>> for
> >> > > > > > >>>>>>   example, if we want to calculate the multiple-level
> >> > > > aggregation,
> >> > > > > > >>> we
> >> > > > > > >>>>> can
> >> > > > > > >>>>>> do
> >> > > > > > >>>>>>   multiple-level aggregation in the local aggregation:
> >> > > > > > >>>>>>
>  input.localKeyBy("a").sum(1).localKeyBy("b").window();
> >> //
> >> > > here
> >> > > > > > >> "a"
> >> > > > > > >>>> is
> >> > > > > > >>>>> a
> >> > > > > > >>>>>>   sub-category, while "b" is a category, here we do not
> >> need
> >> > > to
> >> > > > > > >>>> shuffle
> >> > > > > > >>>>>> data
> >> > > > > > >>>>>>   in the network.
> >> > > > > > >>>>>>   - The users of DataStream API will benefit from this.
> >> > > > Actually,
> >> > > > > > >> we
> >> > > > > > >>>>> have
> >> > > > > > >>>>>>   a lot of scenes need to use DataStream API.
> Currently,
> >> > > > > > >> DataStream
> >> > > > > > >>>> API
> >> > > > > > >>>>> is
> >> > > > > > >>>>>>   the cornerstone of the physical plan of Flink SQL.
> >> With a
> >> > > > > > >>> localKeyBy
> >> > > > > > >>>>>> API,
> >> > > > > > >>>>>>   the optimization of SQL at least may use this
> optimized
> >> > API,
> >> > > > > > >> this
> >> > > > > > >>>> is a
> >> > > > > > >>>>>>   further topic.
> >> > > > > > >>>>>>   - Based on the window operator, our state would
> benefit
> >> > from
> >> > > > > > >> Flink
> >> > > > > > >>>>> State
> >> > > > > > >>>>>>   and checkpoint, we do not need to worry about OOM and
> >> job
> >> > > > > > >> failed.
> >> > > > > > >>>>>>
> >> > > > > > >>>>>> Now, about your questions:
> >> > > > > > >>>>>>
> >> > > > > > >>>>>> 1. About our design cannot change the data type and
> about
> >> > the
> >> > > > > > >>>>>> implementation of average:
> >> > > > > > >>>>>>
> >> > > > > > >>>>>> Just like my reply to Hequn, the localKeyBy is an API
> >> > provides
> >> > > > to
> >> > > > > > >> the
> >> > > > > > >>>>> users
> >> > > > > > >>>>>> who use DataStream API to build their jobs.
> >> > > > > > >>>>>> Users should know its semantics and the difference with
> >> > keyBy
> >> > > > API,
> >> > > > > > >> so
> >> > > > > > >>>> if
> >> > > > > > >>>>>> they want to the average aggregation, they should carry
> >> > local
> >> > > > sum
> >> > > > > > >>>> result
> >> > > > > > >>>>>> and local count result.
> >> > > > > > >>>>>> I admit that it will be convenient to use keyBy
> directly.
> >> > But
> >> > > we
> >> > > > > > >> need
> >> > > > > > >>>> to
> >> > > > > > >>>>>> pay a little price when we get some benefits. I think
> >> this
> >> > > price
> >> > > > > is
> >> > > > > > >>>>>> reasonable. Considering that the DataStream API itself
> >> is a
> >> > > > > > >> low-level
> >> > > > > > >>>> API
> >> > > > > > >>>>>> (at least for now).
> >> > > > > > >>>>>>
> >> > > > > > >>>>>> 2. About stateless operator and
> >> > > > > > >>>>>> `StreamOperator::prepareSnapshotPreBarrier()`:
> >> > > > > > >>>>>>
> >> > > > > > >>>>>> Actually, I have discussed this opinion with @dianfu in
> >> the
> >> > > old
> >> > > > > > >> mail
> >> > > > > > >>>>>> thread. I will copy my opinion from there:
> >> > > > > > >>>>>>
> >> > > > > > >>>>>>   - for your design, you still need somewhere to give
> the
> >> > > users
> >> > > > > > >>>>> configure
> >> > > > > > >>>>>>   the trigger threshold (maybe memory availability?),
> >> this
> >> > > > design
> >> > > > > > >>>> cannot
> >> > > > > > >>>>>>   guarantee a deterministic semantics (it will bring
> >> trouble
> >> > > for
> >> > > > > > >>>> testing
> >> > > > > > >>>>>> and
> >> > > > > > >>>>>>   debugging).
> >> > > > > > >>>>>>   - if the implementation depends on the timing of
> >> > checkpoint,
> >> > > > it
> >> > > > > > >>>> would
> >> > > > > > >>>>>>   affect the checkpoint's progress, and the buffered
> data
> >> > may
> >> > > > > > >> cause
> >> > > > > > >>>> OOM
> >> > > > > > >>>>>>   issue. In addition, if the operator is stateless, it
> >> can
> >> > not
> >> > > > > > >>> provide
> >> > > > > > >>>>>> fault
> >> > > > > > >>>>>>   tolerance.
> >> > > > > > >>>>>>
> >> > > > > > >>>>>> Best,
> >> > > > > > >>>>>> Vino
> >> > > > > > >>>>>>
> >> > > > > > >>>>>> Kurt Young <ykt...@gmail.com> 于2019年6月18日周二 上午9:22写道:
> >> > > > > > >>>>>>
> >> > > > > > >>>>>>> Hi Vino,
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>> Thanks for the proposal, I like the general idea and
> IMO
> >> > it's
> >> > > > > > >> very
> >> > > > > > >>>>> useful
> >> > > > > > >>>>>>> feature.
> >> > > > > > >>>>>>> But after reading through the document, I feel that we
> >> may
> >> > > over
> >> > > > > > >>>> design
> >> > > > > > >>>>>> the
> >> > > > > > >>>>>>> required
> >> > > > > > >>>>>>> operator for proper local aggregation. The main reason
> >> is
> >> > we
> >> > > > want
> >> > > > > > >>> to
> >> > > > > > >>>>>> have a
> >> > > > > > >>>>>>> clear definition and behavior about the "local keyed
> >> state"
> >> > > > which
> >> > > > > > >>> in
> >> > > > > > >>>> my
> >> > > > > > >>>>>>> opinion is not
> >> > > > > > >>>>>>> necessary for local aggregation, at least for start.
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>> Another issue I noticed is the local key by operator
> >> cannot
> >> > > > > > >> change
> >> > > > > > >>>>>> element
> >> > > > > > >>>>>>> type, it will
> >> > > > > > >>>>>>> also restrict a lot of use cases which can be benefit
> >> from
> >> > > > local
> >> > > > > > >>>>>>> aggregation, like "average".
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>> We also did similar logic in SQL and the only thing
> >> need to
> >> > > be
> >> > > > > > >> done
> >> > > > > > >>>> is
> >> > > > > > >>>>>>> introduce
> >> > > > > > >>>>>>> a stateless lightweight operator which is *chained*
> >> before
> >> > > > > > >>> `keyby()`.
> >> > > > > > >>>>> The
> >> > > > > > >>>>>>> operator will flush all buffered
> >> > > > > > >>>>>>> elements during
> >> > `StreamOperator::prepareSnapshotPreBarrier()`
> >> > > > and
> >> > > > > > >>>> make
> >> > > > > > >>>>>>> himself stateless.
> >> > > > > > >>>>>>> By the way, in the earlier version we also did the
> >> similar
> >> > > > > > >> approach
> >> > > > > > >>>> by
> >> > > > > > >>>>>>> introducing a stateful
> >> > > > > > >>>>>>> local aggregation operator but it's not performed as
> >> well
> >> > as
> >> > > > the
> >> > > > > > >>>> later
> >> > > > > > >>>>>> one,
> >> > > > > > >>>>>>> and also effect the barrie
> >> > > > > > >>>>>>> alignment time. The later one is fairly simple and
> more
> >> > > > > > >> efficient.
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>> I would highly suggest you to consider to have a
> >> stateless
> >> > > > > > >> approach
> >> > > > > > >>>> at
> >> > > > > > >>>>>> the
> >> > > > > > >>>>>>> first step.
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>> Best,
> >> > > > > > >>>>>>> Kurt
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>> On Mon, Jun 17, 2019 at 7:32 PM Jark Wu <
> >> imj...@gmail.com>
> >> > > > > > >> wrote:
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>>> Hi Vino,
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>> Thanks for the proposal.
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>> Regarding to the "input.keyBy(0).sum(1)" vs
> >> > > > > > >>>>>>>>
> >> > "input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)",
> >> > > > > > >> have
> >> > > > > > >>>> you
> >> > > > > > >>>>>>> done
> >> > > > > > >>>>>>>> some benchmark?
> >> > > > > > >>>>>>>> Because I'm curious about how much performance
> >> improvement
> >> > > can
> >> > > > > > >> we
> >> > > > > > >>>> get
> >> > > > > > >>>>>> by
> >> > > > > > >>>>>>>> using count window as the local operator.
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>> Best,
> >> > > > > > >>>>>>>> Jark
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>> On Mon, 17 Jun 2019 at 17:48, vino yang <
> >> > > > yanghua1...@gmail.com
> >> > > > > > >>>
> >> > > > > > >>>>> wrote:
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>>> Hi Hequn,
> >> > > > > > >>>>>>>>>
> >> > > > > > >>>>>>>>> Thanks for your reply.
> >> > > > > > >>>>>>>>>
> >> > > > > > >>>>>>>>> The purpose of localKeyBy API is to provide a tool
> >> which
> >> > > can
> >> > > > > > >>> let
> >> > > > > > >>>>>> users
> >> > > > > > >>>>>>> do
> >> > > > > > >>>>>>>>> pre-aggregation in the local. The behavior of the
> >> > > > > > >>> pre-aggregation
> >> > > > > > >>>>> is
> >> > > > > > >>>>>>>>> similar to keyBy API.
> >> > > > > > >>>>>>>>>
> >> > > > > > >>>>>>>>> So the three cases are different, I will describe
> them
> >> > one
> >> > > by
> >> > > > > > >>>> one:
> >> > > > > > >>>>>>>>>
> >> > > > > > >>>>>>>>> 1. input.keyBy(0).sum(1)
> >> > > > > > >>>>>>>>>
> >> > > > > > >>>>>>>>> *In this case, the result is event-driven, each
> event
> >> can
> >> > > > > > >>> produce
> >> > > > > > >>>>> one
> >> > > > > > >>>>>>> sum
> >> > > > > > >>>>>>>>> aggregation result and it is the latest one from the
> >> > source
> >> > > > > > >>>> start.*
> >> > > > > > >>>>>>>>>
> >> > > > > > >>>>>>>>> 2. input.localKeyBy(0).sum(1).keyBy(0).sum(1)
> >> > > > > > >>>>>>>>>
> >> > > > > > >>>>>>>>> *In this case, the semantic may have a problem, it
> >> would
> >> > do
> >> > > > > > >> the
> >> > > > > > >>>>> local
> >> > > > > > >>>>>>> sum
> >> > > > > > >>>>>>>>> aggregation and will produce the latest partial
> result
> >> > from
> >> > > > > > >> the
> >> > > > > > >>>>>> source
> >> > > > > > >>>>>>>>> start for every event. *
> >> > > > > > >>>>>>>>> *These latest partial results from the same key are
> >> > hashed
> >> > > to
> >> > > > > > >>> one
> >> > > > > > >>>>>> node
> >> > > > > > >>>>>>> to
> >> > > > > > >>>>>>>>> do the global sum aggregation.*
> >> > > > > > >>>>>>>>> *In the global aggregation, when it received
> multiple
> >> > > partial
> >> > > > > > >>>>> results
> >> > > > > > >>>>>>>> (they
> >> > > > > > >>>>>>>>> are all calculated from the source start) and sum
> them
> >> > will
> >> > > > > > >> get
> >> > > > > > >>>> the
> >> > > > > > >>>>>>> wrong
> >> > > > > > >>>>>>>>> result.*
> >> > > > > > >>>>>>>>>
> >> > > > > > >>>>>>>>> 3.
> >> > > input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)
> >> > > > > > >>>>>>>>>
> >> > > > > > >>>>>>>>> *In this case, it would just get a partial
> aggregation
> >> > > result
> >> > > > > > >>> for
> >> > > > > > >>>>>> the 5
> >> > > > > > >>>>>>>>> records in the count window. The partial aggregation
> >> > > results
> >> > > > > > >>> from
> >> > > > > > >>>>> the
> >> > > > > > >>>>>>>> same
> >> > > > > > >>>>>>>>> key will be aggregated globally.*
> >> > > > > > >>>>>>>>>
> >> > > > > > >>>>>>>>> So the first case and the third case can get the
> >> *same*
> >> > > > > > >> result,
> >> > > > > > >>>> the
> >> > > > > > >>>>>>>>> difference is the output-style and the latency.
> >> > > > > > >>>>>>>>>
> >> > > > > > >>>>>>>>> Generally speaking, the local key API is just an
> >> > > optimization
> >> > > > > > >>>> API.
> >> > > > > > >>>>> We
> >> > > > > > >>>>>>> do
> >> > > > > > >>>>>>>>> not limit the user's usage, but the user has to
> >> > understand
> >> > > > > > >> its
> >> > > > > > >>>>>>> semantics
> >> > > > > > >>>>>>>>> and use it correctly.
> >> > > > > > >>>>>>>>>
> >> > > > > > >>>>>>>>> Best,
> >> > > > > > >>>>>>>>> Vino
> >> > > > > > >>>>>>>>>
> >> > > > > > >>>>>>>>> Hequn Cheng <chenghe...@gmail.com> 于2019年6月17日周一
> >> > 下午4:18写道:
> >> > > > > > >>>>>>>>>
> >> > > > > > >>>>>>>>>> Hi Vino,
> >> > > > > > >>>>>>>>>>
> >> > > > > > >>>>>>>>>> Thanks for the proposal, I think it is a very good
> >> > > feature!
> >> > > > > > >>>>>>>>>>
> >> > > > > > >>>>>>>>>> One thing I want to make sure is the semantics for
> >> the
> >> > > > > > >>>>>> `localKeyBy`.
> >> > > > > > >>>>>>>> From
> >> > > > > > >>>>>>>>>> the document, the `localKeyBy` API returns an
> >> instance
> >> > of
> >> > > > > > >>>>>>> `KeyedStream`
> >> > > > > > >>>>>>>>>> which can also perform sum(), so in this case,
> what's
> >> > the
> >> > > > > > >>>>> semantics
> >> > > > > > >>>>>>> for
> >> > > > > > >>>>>>>>>> `localKeyBy()`. For example, will the following
> code
> >> > share
> >> > > > > > >>> the
> >> > > > > > >>>>> same
> >> > > > > > >>>>>>>>> result?
> >> > > > > > >>>>>>>>>> and what're the differences between them?
> >> > > > > > >>>>>>>>>>
> >> > > > > > >>>>>>>>>> 1. input.keyBy(0).sum(1)
> >> > > > > > >>>>>>>>>> 2. input.localKeyBy(0).sum(1).keyBy(0).sum(1)
> >> > > > > > >>>>>>>>>> 3.
> >> > > > > > >> input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)
> >> > > > > > >>>>>>>>>>
> >> > > > > > >>>>>>>>>> Would also be great if we can add this into the
> >> > document.
> >> > > > > > >>> Thank
> >> > > > > > >>>>> you
> >> > > > > > >>>>>>>> very
> >> > > > > > >>>>>>>>>> much.
> >> > > > > > >>>>>>>>>>
> >> > > > > > >>>>>>>>>> Best, Hequn
> >> > > > > > >>>>>>>>>>
> >> > > > > > >>>>>>>>>>
> >> > > > > > >>>>>>>>>> On Fri, Jun 14, 2019 at 11:34 AM vino yang <
> >> > > > > > >>>>> yanghua1...@gmail.com>
> >> > > > > > >>>>>>>>> wrote:
> >> > > > > > >>>>>>>>>>
> >> > > > > > >>>>>>>>>>> Hi Aljoscha,
> >> > > > > > >>>>>>>>>>>
> >> > > > > > >>>>>>>>>>> I have looked at the "*Process*" section of FLIP
> >> wiki
> >> > > > > > >>>> page.[1]
> >> > > > > > >>>>>> This
> >> > > > > > >>>>>>>>> mail
> >> > > > > > >>>>>>>>>>> thread indicates that it has proceeded to the
> third
> >> > step.
> >> > > > > > >>>>>>>>>>>
> >> > > > > > >>>>>>>>>>> When I looked at the fourth step(vote step), I
> >> didn't
> >> > > > > > >> find
> >> > > > > > >>>> the
> >> > > > > > >>>>>>>>>>> prerequisites for starting the voting process.
> >> > > > > > >>>>>>>>>>>
> >> > > > > > >>>>>>>>>>> Considering that the discussion of this feature
> has
> >> > been
> >> > > > > > >>> done
> >> > > > > > >>>>> in
> >> > > > > > >>>>>>> the
> >> > > > > > >>>>>>>>> old
> >> > > > > > >>>>>>>>>>> thread. [2] So can you tell me when should I start
> >> > > > > > >> voting?
> >> > > > > > >>>> Can
> >> > > > > > >>>>> I
> >> > > > > > >>>>>>>> start
> >> > > > > > >>>>>>>>>> now?
> >> > > > > > >>>>>>>>>>>
> >> > > > > > >>>>>>>>>>> Best,
> >> > > > > > >>>>>>>>>>> Vino
> >> > > > > > >>>>>>>>>>>
> >> > > > > > >>>>>>>>>>> [1]:
> >> > > > > > >>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>
> >> > > > > > >>>>>>>>>>
> >> > > > > > >>>>>>>>>
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>
> >> > > > > > >>>>>
> >> > > > > > >>>>
> >> > > > > > >>>
> >> > > > > > >>
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals#FlinkImprovementProposals-FLIPround-up
> >> > > > > > >>>>>>>>>>> [2]:
> >> > > > > > >>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>
> >> > > > > > >>>>>>>>>>
> >> > > > > > >>>>>>>>>
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>
> >> > > > > > >>>>>
> >> > > > > > >>>>
> >> > > > > > >>>
> >> > > > > > >>
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308
> >> > > > > > >>>>>>>>>>>
> >> > > > > > >>>>>>>>>>> leesf <leesf0...@gmail.com> 于2019年6月13日周四
> 上午9:19写道:
> >> > > > > > >>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>> +1 for the FLIP, thank vino for your efforts.
> >> > > > > > >>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>> Best,
> >> > > > > > >>>>>>>>>>>> Leesf
> >> > > > > > >>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>> vino yang <yanghua1...@gmail.com> 于2019年6月12日周三
> >> > > > > > >>> 下午5:46写道:
> >> > > > > > >>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>> Hi folks,
> >> > > > > > >>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>> I would like to start the FLIP discussion thread
> >> > > > > > >> about
> >> > > > > > >>>>>>> supporting
> >> > > > > > >>>>>>>>>> local
> >> > > > > > >>>>>>>>>>>>> aggregation in Flink.
> >> > > > > > >>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>> In short, this feature can effectively alleviate
> >> data
> >> > > > > > >>>> skew.
> >> > > > > > >>>>>>> This
> >> > > > > > >>>>>>>> is
> >> > > > > > >>>>>>>>>> the
> >> > > > > > >>>>>>>>>>>>> FLIP:
> >> > > > > > >>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>
> >> > > > > > >>>>>>>>>>
> >> > > > > > >>>>>>>>>
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>
> >> > > > > > >>>>>
> >> > > > > > >>>>
> >> > > > > > >>>
> >> > > > > > >>
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-44%3A+Support+Local+Aggregation+in+Flink
> >> > > > > > >>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>> *Motivation* (copied from FLIP)
> >> > > > > > >>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>> Currently, keyed streams are widely used to
> >> perform
> >> > > > > > >>>>>> aggregating
> >> > > > > > >>>>>>>>>>>> operations
> >> > > > > > >>>>>>>>>>>>> (e.g., reduce, sum and window) on the elements
> >> that
> >> > > > > > >>> have
> >> > > > > > >>>>> the
> >> > > > > > >>>>>>> same
> >> > > > > > >>>>>>>>>> key.
> >> > > > > > >>>>>>>>>>>> When
> >> > > > > > >>>>>>>>>>>>> executed at runtime, the elements with the same
> >> key
> >> > > > > > >>> will
> >> > > > > > >>>> be
> >> > > > > > >>>>>>> sent
> >> > > > > > >>>>>>>> to
> >> > > > > > >>>>>>>>>> and
> >> > > > > > >>>>>>>>>>>>> aggregated by the same task.
> >> > > > > > >>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>> The performance of these aggregating operations
> is
> >> > > > > > >> very
> >> > > > > > >>>>>>> sensitive
> >> > > > > > >>>>>>>>> to
> >> > > > > > >>>>>>>>>>> the
> >> > > > > > >>>>>>>>>>>>> distribution of keys. In the cases where the
> >> > > > > > >>> distribution
> >> > > > > > >>>>> of
> >> > > > > > >>>>>>> keys
> >> > > > > > >>>>>>>>>>>> follows a
> >> > > > > > >>>>>>>>>>>>> powerful law, the performance will be
> >> significantly
> >> > > > > > >>>>>> downgraded.
> >> > > > > > >>>>>>>>> More
> >> > > > > > >>>>>>>>>>>>> unluckily, increasing the degree of parallelism
> >> does
> >> > > > > > >>> not
> >> > > > > > >>>>> help
> >> > > > > > >>>>>>>> when
> >> > > > > > >>>>>>>>> a
> >> > > > > > >>>>>>>>>>> task
> >> > > > > > >>>>>>>>>>>>> is overloaded by a single key.
> >> > > > > > >>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>> Local aggregation is a widely-adopted method to
> >> > > > > > >> reduce
> >> > > > > > >>>> the
> >> > > > > > >>>>>>>>>> performance
> >> > > > > > >>>>>>>>>>>>> degraded by data skew. We can decompose the
> >> > > > > > >> aggregating
> >> > > > > > >>>>>>>> operations
> >> > > > > > >>>>>>>>>> into
> >> > > > > > >>>>>>>>>>>> two
> >> > > > > > >>>>>>>>>>>>> phases. In the first phase, we aggregate the
> >> elements
> >> > > > > > >>> of
> >> > > > > > >>>>> the
> >> > > > > > >>>>>>> same
> >> > > > > > >>>>>>>>> key
> >> > > > > > >>>>>>>>>>> at
> >> > > > > > >>>>>>>>>>>>> the sender side to obtain partial results. Then
> at
> >> > > > > > >> the
> >> > > > > > >>>>> second
> >> > > > > > >>>>>>>>> phase,
> >> > > > > > >>>>>>>>>>>> these
> >> > > > > > >>>>>>>>>>>>> partial results are sent to receivers according
> to
> >> > > > > > >>> their
> >> > > > > > >>>>> keys
> >> > > > > > >>>>>>> and
> >> > > > > > >>>>>>>>> are
> >> > > > > > >>>>>>>>>>>>> combined to obtain the final result. Since the
> >> number
> >> > > > > > >>> of
> >> > > > > > >>>>>>> partial
> >> > > > > > >>>>>>>>>>> results
> >> > > > > > >>>>>>>>>>>>> received by each receiver is limited by the
> >> number of
> >> > > > > > >>>>>> senders,
> >> > > > > > >>>>>>>> the
> >> > > > > > >>>>>>>>>>>>> imbalance among receivers can be reduced.
> >> Besides, by
> >> > > > > > >>>>>> reducing
> >> > > > > > >>>>>>>> the
> >> > > > > > >>>>>>>>>>> amount
> >> > > > > > >>>>>>>>>>>>> of transferred data the performance can be
> further
> >> > > > > > >>>>> improved.
> >> > > > > > >>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>> *More details*:
> >> > > > > > >>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>> Design documentation:
> >> > > > > > >>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>
> >> > > > > > >>>>>>>>>>
> >> > > > > > >>>>>>>>>
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>
> >> > > > > > >>>>>
> >> > > > > > >>>>
> >> > > > > > >>>
> >> > > > > > >>
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing
> >> > > > > > >>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>> Old discussion thread:
> >> > > > > > >>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>
> >> > > > > > >>>>>>>>>>
> >> > > > > > >>>>>>>>>
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>
> >> > > > > > >>>>>
> >> > > > > > >>>>
> >> > > > > > >>>
> >> > > > > > >>
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308
> >> > > > > > >>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>> JIRA: FLINK-12786 <
> >> > > > > > >>>>>>>>> https://issues.apache.org/jira/browse/FLINK-12786
> >> > > > > > >>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>> We are looking forwards to your feedback!
> >> > > > > > >>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>> Best,
> >> > > > > > >>>>>>>>>>>>> Vino
> >> > > > > > >>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>
> >> > > > > > >>>>>>>>>>
> >> > > > > > >>>>>>>>>
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>
> >> > > > > > >>>>>
> >> > > > > > >>>>
> >> > > > > > >>>
> >> > > > > > >>
> >> > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
>

Reply via email to