Hi Kurt and Piotrek,

Thanks for your comments.

I agree that we can provide a better abstraction to be compatible with two
different implementations.

First of all, I think we should consider what kind of scenarios we need to
support in *API* level?

We have some use cases which need to a customized aggregation through
KeyedProcessFunction, (in the usage of our localKeyBy.window they can use
ProcessWindowFunction).

Shall we support these flexible use scenarios?

Best,
Vino

Kurt Young <ykt...@gmail.com> 于2019年6月18日周二 下午8:37写道:

> Hi Piotr,
>
> Thanks for joining the discussion. Make “local aggregation" abstract enough
> sounds good to me, we could
> implement and verify alternative solutions for use cases of local
> aggregation. Maybe we will find both solutions
> are appropriate for different scenarios.
>
> Starting from a simple one sounds a practical way to go. What do you think,
> vino?
>
> Best,
> Kurt
>
>
> On Tue, Jun 18, 2019 at 8:10 PM Piotr Nowojski <pi...@ververica.com>
> wrote:
>
> > Hi Kurt and Vino,
> >
> > I think there is a trade of hat we need to consider for the local
> > aggregation.
> >
> > Generally speaking I would agree with Kurt about local aggregation/pre
> > aggregation not using Flink's state flush the operator on a checkpoint.
> > Network IO is usually cheaper compared to Disks IO. This has however
> couple
> > of issues:
> > 1. It can explode number of in-flight records during checkpoint barrier
> > alignment, making checkpointing slower and decrease the actual
> throughput.
> > 2. This trades Disks IO on the local aggregation machine with CPU (and
> > Disks IO in case of RocksDB) on the final aggregation machine. This is
> > fine, as long there is no huge data skew. If there is only a handful (or
> > even one single) hot keys, it might be better to keep the persistent
> state
> > in the LocalAggregationOperator to offload final aggregation as much as
> > possible.
> > 3. With frequent checkpointing local aggregation effectiveness would
> > degrade.
> >
> > I assume Kurt is correct, that in your use cases stateless operator was
> > behaving better, but I could easily see other use cases as well. For
> > example someone is already using RocksDB, and his job is bottlenecked on
> a
> > single window operator instance because of the data skew. In that case
> > stateful local aggregation would be probably a better choice.
> >
> > Because of that, I think we should eventually provide both versions and
> in
> > the initial version we should at least make the “local aggregation
> engine”
> > abstract enough, that one could easily provide different implementation
> > strategy.
> >
> > Piotrek
> >
> > > On 18 Jun 2019, at 11:46, Kurt Young <ykt...@gmail.com> wrote:
> > >
> > > Hi,
> > >
> > > For the trigger, it depends on what operator we want to use under the
> > API.
> > > If we choose to use window operator,
> > > we should also use window's trigger. However, I also think reuse window
> > > operator for this scenario may not be
> > > the best choice. The reasons are the following:
> > >
> > > 1. As a lot of people already pointed out, window relies heavily on
> state
> > > and it will definitely effect performance. You can
> > > argue that one can use heap based statebackend, but this will introduce
> > > extra coupling. Especially we have a chance to
> > > design a pure stateless operator.
> > > 2. The window operator is *the most* complicated operator Flink
> currently
> > > have. Maybe we only need to pick a subset of
> > > window operator to achieve the goal, but once the user wants to have a
> > deep
> > > look at the localAggregation operator, it's still
> > > hard to find out what's going on under the window operator. For
> > simplicity,
> > > I would also recommend we introduce a dedicated
> > > lightweight operator, which also much easier for a user to learn and
> use.
> > >
> > > For your question about increasing the burden in
> > > `StreamOperator::prepareSnapshotPreBarrier()`, the only thing this
> > function
> > > need
> > > to do is output all the partial results, it's purely cpu workload, not
> > > introducing any IO. I want to point out that even if we have this
> > > cost, we reduced another barrier align cost of the operator, which is
> the
> > > sync flush stage of the state, if you introduced state. This
> > > flush actually will introduce disk IO, and I think it's worthy to
> > exchange
> > > this cost with purely CPU workload. And we do have some
> > > observations about these two behavior (as i said before, we actually
> > > implemented both solutions), the stateless one actually performs
> > > better both in performance and barrier align time.
> > >
> > > Best,
> > > Kurt
> > >
> > >
> > > On Tue, Jun 18, 2019 at 3:40 PM vino yang <yanghua1...@gmail.com>
> wrote:
> > >
> > >> Hi Kurt,
> > >>
> > >> Thanks for your example. Now, it looks more clearly for me.
> > >>
> > >> From your example code snippet, I saw the localAggregate API has three
> > >> parameters:
> > >>
> > >>   1. key field
> > >>   2. PartitionAvg
> > >>   3. CountTrigger: Does this trigger comes from window package?
> > >>
> > >> I will compare our and your design from API and operator level:
> > >>
> > >> *From the API level:*
> > >>
> > >> As I replied to @dianfu in the old email thread,[1] the Window API can
> > >> provide the second and the third parameter right now.
> > >>
> > >> If you reuse specified interface or class, such as *Trigger* or
> > >> *CounterTrigger* provided by window package, but do not use window
> API,
> > >> it's not reasonable.
> > >> And if you do not reuse these interface or class, you would need to
> > >> introduce more things however they are looked similar to the things
> > >> provided by window package.
> > >>
> > >> The window package has provided several types of the window and many
> > >> triggers and let users customize it. What's more, the user is more
> > familiar
> > >> with Window API.
> > >>
> > >> This is the reason why we just provide localKeyBy API and reuse the
> > window
> > >> API. It reduces unnecessary components such as triggers and the
> > mechanism
> > >> of buffer (based on count num or time).
> > >> And it has a clear and easy to understand semantics.
> > >>
> > >> *From the operator level:*
> > >>
> > >> We reused window operator, so we can get all the benefits from state
> and
> > >> checkpoint.
> > >>
> > >> From your design, you named the operator under localAggregate API is a
> > >> *stateless* operator. IMO, it is still a state, it is just not Flink
> > >> managed state.
> > >> About the memory buffer (I think it's still not very clear, if you
> have
> > >> time, can you give more detail information or answer my questions), I
> > have
> > >> some questions:
> > >>
> > >>   - if it just a raw JVM heap memory buffer, how to support fault
> > >>   tolerance, if the job is configured EXACTLY-ONCE semantic guarantee?
> > >>   - if you thought the memory buffer(non-Flink state), has better
> > >>   performance. In our design, users can also config HEAP state backend
> > to
> > >>   provide the performance close to your mechanism.
> > >>   - `StreamOperator::prepareSnapshotPreBarrier()` related to the
> timing
> > of
> > >>   snapshot. IMO, the flush action should be a synchronized action? (if
> > >> not,
> > >>   please point out my mistake) I still think we should not depend on
> the
> > >>   timing of checkpoint. Checkpoint related operations are inherent
> > >>   performance sensitive, we should not increase its burden anymore.
> Our
> > >>   implementation based on the mechanism of Flink's checkpoint, which
> can
> > >>   benefit from the asnyc snapshot and incremental checkpoint. IMO, the
> > >>   performance is not a problem, and we also do not find the
> performance
> > >> issue
> > >>   in our production.
> > >>
> > >> [1]:
> > >>
> > >>
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308
> > >>
> > >> Best,
> > >> Vino
> > >>
> > >> Kurt Young <ykt...@gmail.com> 于2019年6月18日周二 下午2:27写道:
> > >>
> > >>> Yeah, sorry for not expressing myself clearly. I will try to provide
> > more
> > >>> details to make sure we are on the same page.
> > >>>
> > >>> For DataStream API, it shouldn't be optimized automatically. You have
> > to
> > >>> explicitly call API to do local aggregation
> > >>> as well as the trigger policy of the local aggregation. Take average
> > for
> > >>> example, the user program may look like this (just a draft):
> > >>>
> > >>> assuming the input type is DataStream<Tupl2<String, Int>>
> > >>>
> > >>> ds.localAggregate(
> > >>>        0,                                       // The local key,
> which
> > >> is
> > >>> the String from Tuple2
> > >>>        PartitionAvg(1),                 // The partial aggregation
> > >>> function, produces Tuple2<Long, Int>, indicating sum and count
> > >>>        CountTrigger.of(1000L)    // Trigger policy, note this should
> be
> > >>> best effort, and also be composited with time based or memory size
> > based
> > >>> trigger
> > >>>    )                                           // The return type is
> > >> local
> > >>> aggregate Tuple2<String, Tupl2<Long, Int>>
> > >>>    .keyBy(0)                             // Further keyby it with
> > >> required
> > >>> key
> > >>>    .aggregate(1)                      // This will merge all the
> > partial
> > >>> results and get the final average.
> > >>>
> > >>> (This is only a draft, only trying to explain what it looks like. )
> > >>>
> > >>> The local aggregate operator can be stateless, we can keep a memory
> > >> buffer
> > >>> or other efficient data structure to improve the aggregate
> performance.
> > >>>
> > >>> Let me know if you have any other questions.
> > >>>
> > >>> Best,
> > >>> Kurt
> > >>>
> > >>>
> > >>> On Tue, Jun 18, 2019 at 1:29 PM vino yang <yanghua1...@gmail.com>
> > wrote:
> > >>>
> > >>>> Hi Kurt,
> > >>>>
> > >>>> Thanks for your reply.
> > >>>>
> > >>>> Actually, I am not against you to raise your design.
> > >>>>
> > >>>> From your description before, I just can imagine your high-level
> > >>>> implementation is about SQL and the optimization is inner of the
> API.
> > >> Is
> > >>> it
> > >>>> automatically? how to give the configuration option about trigger
> > >>>> pre-aggregation?
> > >>>>
> > >>>> Maybe after I get more information, it sounds more reasonable.
> > >>>>
> > >>>> IMO, first of all, it would be better to make your user interface
> > >>> concrete,
> > >>>> it's the basis of the discussion.
> > >>>>
> > >>>> For example, can you give an example code snippet to introduce how
> to
> > >>> help
> > >>>> users to process data skew caused by the jobs which built with
> > >> DataStream
> > >>>> API?
> > >>>>
> > >>>> If you give more details we can discuss further more. I think if one
> > >>> design
> > >>>> introduces an exact interface and another does not.
> > >>>>
> > >>>> The implementation has an obvious difference. For example, we
> > introduce
> > >>> an
> > >>>> exact API in DataStream named localKeyBy, about the pre-aggregation
> we
> > >>> need
> > >>>> to define the trigger mechanism of local aggregation, so we find
> > reused
> > >>>> window API and operator is a good choice. This is a reasoning link
> > from
> > >>>> design to implementation.
> > >>>>
> > >>>> What do you think?
> > >>>>
> > >>>> Best,
> > >>>> Vino
> > >>>>
> > >>>>
> > >>>> Kurt Young <ykt...@gmail.com> 于2019年6月18日周二 上午11:58写道:
> > >>>>
> > >>>>> Hi Vino,
> > >>>>>
> > >>>>> Now I feel that we may have different understandings about what
> kind
> > >> of
> > >>>>> problems or improvements you want to
> > >>>>> resolve. Currently, most of the feedback are focusing on *how to
> do a
> > >>>>> proper local aggregation to improve performance
> > >>>>> and maybe solving the data skew issue*. And my gut feeling is this
> is
> > >>>>> exactly what users want at the first place,
> > >>>>> especially those +1s. (Sorry to try to summarize here, please
> correct
> > >>> me
> > >>>> if
> > >>>>> i'm wrong).
> > >>>>>
> > >>>>> But I still think the design is somehow diverged from the goal. If
> we
> > >>>> want
> > >>>>> to have an efficient and powerful way to
> > >>>>> have local aggregation, supporting intermedia result type is
> > >> essential
> > >>>> IMO.
> > >>>>> Both runtime's `AggregateFunction` and
> > >>>>> SQL`s `UserDefinedAggregateFunction` have a proper support of
> > >>>> intermediate
> > >>>>> result type and can do `merge` operation
> > >>>>> on them.
> > >>>>>
> > >>>>> Now, we have a lightweight alternatives which performs well, and
> > >> have a
> > >>>>> nice fit with the local aggregate requirements.
> > >>>>> Mostly importantly,  it's much less complex because it's stateless.
> > >> And
> > >>>> it
> > >>>>> can also achieve the similar multiple-aggregation
> > >>>>> scenario.
> > >>>>>
> > >>>>> I still not convinced why we shouldn't consider it as a first step.
> > >>>>>
> > >>>>> Best,
> > >>>>> Kurt
> > >>>>>
> > >>>>>
> > >>>>> On Tue, Jun 18, 2019 at 11:35 AM vino yang <yanghua1...@gmail.com>
> > >>>> wrote:
> > >>>>>
> > >>>>>> Hi Kurt,
> > >>>>>>
> > >>>>>> Thanks for your comments.
> > >>>>>>
> > >>>>>> It seems we both implemented local aggregation feature to optimize
> > >>> the
> > >>>>>> issue of data skew.
> > >>>>>> However, IMHO, the API level of optimizing revenue is different.
> > >>>>>>
> > >>>>>> *Your optimization benefits from Flink SQL and it's not user's
> > >>>> faces.(If
> > >>>>> I
> > >>>>>> understand it incorrectly, please correct this.)*
> > >>>>>> *Our implementation employs it as an optimization tool API for
> > >>>>> DataStream,
> > >>>>>> it just like a local version of the keyBy API.*
> > >>>>>>
> > >>>>>> Based on this, I want to say support it as a DataStream API can
> > >>> provide
> > >>>>>> these advantages:
> > >>>>>>
> > >>>>>>
> > >>>>>>   - The localKeyBy API has a clear semantic and it's flexible not
> > >>> only
> > >>>>> for
> > >>>>>>   processing data skew but also for implementing some user cases,
> > >>> for
> > >>>>>>   example, if we want to calculate the multiple-level aggregation,
> > >>> we
> > >>>>> can
> > >>>>>> do
> > >>>>>>   multiple-level aggregation in the local aggregation:
> > >>>>>>   input.localKeyBy("a").sum(1).localKeyBy("b").window(); // here
> > >> "a"
> > >>>> is
> > >>>>> a
> > >>>>>>   sub-category, while "b" is a category, here we do not need to
> > >>>> shuffle
> > >>>>>> data
> > >>>>>>   in the network.
> > >>>>>>   - The users of DataStream API will benefit from this. Actually,
> > >> we
> > >>>>> have
> > >>>>>>   a lot of scenes need to use DataStream API. Currently,
> > >> DataStream
> > >>>> API
> > >>>>> is
> > >>>>>>   the cornerstone of the physical plan of Flink SQL. With a
> > >>> localKeyBy
> > >>>>>> API,
> > >>>>>>   the optimization of SQL at least may use this optimized API,
> > >> this
> > >>>> is a
> > >>>>>>   further topic.
> > >>>>>>   - Based on the window operator, our state would benefit from
> > >> Flink
> > >>>>> State
> > >>>>>>   and checkpoint, we do not need to worry about OOM and job
> > >> failed.
> > >>>>>>
> > >>>>>> Now, about your questions:
> > >>>>>>
> > >>>>>> 1. About our design cannot change the data type and about the
> > >>>>>> implementation of average:
> > >>>>>>
> > >>>>>> Just like my reply to Hequn, the localKeyBy is an API provides to
> > >> the
> > >>>>> users
> > >>>>>> who use DataStream API to build their jobs.
> > >>>>>> Users should know its semantics and the difference with keyBy API,
> > >> so
> > >>>> if
> > >>>>>> they want to the average aggregation, they should carry local sum
> > >>>> result
> > >>>>>> and local count result.
> > >>>>>> I admit that it will be convenient to use keyBy directly. But we
> > >> need
> > >>>> to
> > >>>>>> pay a little price when we get some benefits. I think this price
> is
> > >>>>>> reasonable. Considering that the DataStream API itself is a
> > >> low-level
> > >>>> API
> > >>>>>> (at least for now).
> > >>>>>>
> > >>>>>> 2. About stateless operator and
> > >>>>>> `StreamOperator::prepareSnapshotPreBarrier()`:
> > >>>>>>
> > >>>>>> Actually, I have discussed this opinion with @dianfu in the old
> > >> mail
> > >>>>>> thread. I will copy my opinion from there:
> > >>>>>>
> > >>>>>>   - for your design, you still need somewhere to give the users
> > >>>>> configure
> > >>>>>>   the trigger threshold (maybe memory availability?), this design
> > >>>> cannot
> > >>>>>>   guarantee a deterministic semantics (it will bring trouble for
> > >>>> testing
> > >>>>>> and
> > >>>>>>   debugging).
> > >>>>>>   - if the implementation depends on the timing of checkpoint, it
> > >>>> would
> > >>>>>>   affect the checkpoint's progress, and the buffered data may
> > >> cause
> > >>>> OOM
> > >>>>>>   issue. In addition, if the operator is stateless, it can not
> > >>> provide
> > >>>>>> fault
> > >>>>>>   tolerance.
> > >>>>>>
> > >>>>>> Best,
> > >>>>>> Vino
> > >>>>>>
> > >>>>>> Kurt Young <ykt...@gmail.com> 于2019年6月18日周二 上午9:22写道:
> > >>>>>>
> > >>>>>>> Hi Vino,
> > >>>>>>>
> > >>>>>>> Thanks for the proposal, I like the general idea and IMO it's
> > >> very
> > >>>>> useful
> > >>>>>>> feature.
> > >>>>>>> But after reading through the document, I feel that we may over
> > >>>> design
> > >>>>>> the
> > >>>>>>> required
> > >>>>>>> operator for proper local aggregation. The main reason is we want
> > >>> to
> > >>>>>> have a
> > >>>>>>> clear definition and behavior about the "local keyed state" which
> > >>> in
> > >>>> my
> > >>>>>>> opinion is not
> > >>>>>>> necessary for local aggregation, at least for start.
> > >>>>>>>
> > >>>>>>> Another issue I noticed is the local key by operator cannot
> > >> change
> > >>>>>> element
> > >>>>>>> type, it will
> > >>>>>>> also restrict a lot of use cases which can be benefit from local
> > >>>>>>> aggregation, like "average".
> > >>>>>>>
> > >>>>>>> We also did similar logic in SQL and the only thing need to be
> > >> done
> > >>>> is
> > >>>>>>> introduce
> > >>>>>>> a stateless lightweight operator which is *chained* before
> > >>> `keyby()`.
> > >>>>> The
> > >>>>>>> operator will flush all buffered
> > >>>>>>> elements during `StreamOperator::prepareSnapshotPreBarrier()` and
> > >>>> make
> > >>>>>>> himself stateless.
> > >>>>>>> By the way, in the earlier version we also did the similar
> > >> approach
> > >>>> by
> > >>>>>>> introducing a stateful
> > >>>>>>> local aggregation operator but it's not performed as well as the
> > >>>> later
> > >>>>>> one,
> > >>>>>>> and also effect the barrie
> > >>>>>>> alignment time. The later one is fairly simple and more
> > >> efficient.
> > >>>>>>>
> > >>>>>>> I would highly suggest you to consider to have a stateless
> > >> approach
> > >>>> at
> > >>>>>> the
> > >>>>>>> first step.
> > >>>>>>>
> > >>>>>>> Best,
> > >>>>>>> Kurt
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On Mon, Jun 17, 2019 at 7:32 PM Jark Wu <imj...@gmail.com>
> > >> wrote:
> > >>>>>>>
> > >>>>>>>> Hi Vino,
> > >>>>>>>>
> > >>>>>>>> Thanks for the proposal.
> > >>>>>>>>
> > >>>>>>>> Regarding to the "input.keyBy(0).sum(1)" vs
> > >>>>>>>> "input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)",
> > >> have
> > >>>> you
> > >>>>>>> done
> > >>>>>>>> some benchmark?
> > >>>>>>>> Because I'm curious about how much performance improvement can
> > >> we
> > >>>> get
> > >>>>>> by
> > >>>>>>>> using count window as the local operator.
> > >>>>>>>>
> > >>>>>>>> Best,
> > >>>>>>>> Jark
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> On Mon, 17 Jun 2019 at 17:48, vino yang <yanghua1...@gmail.com
> > >>>
> > >>>>> wrote:
> > >>>>>>>>
> > >>>>>>>>> Hi Hequn,
> > >>>>>>>>>
> > >>>>>>>>> Thanks for your reply.
> > >>>>>>>>>
> > >>>>>>>>> The purpose of localKeyBy API is to provide a tool which can
> > >>> let
> > >>>>>> users
> > >>>>>>> do
> > >>>>>>>>> pre-aggregation in the local. The behavior of the
> > >>> pre-aggregation
> > >>>>> is
> > >>>>>>>>> similar to keyBy API.
> > >>>>>>>>>
> > >>>>>>>>> So the three cases are different, I will describe them one by
> > >>>> one:
> > >>>>>>>>>
> > >>>>>>>>> 1. input.keyBy(0).sum(1)
> > >>>>>>>>>
> > >>>>>>>>> *In this case, the result is event-driven, each event can
> > >>> produce
> > >>>>> one
> > >>>>>>> sum
> > >>>>>>>>> aggregation result and it is the latest one from the source
> > >>>> start.*
> > >>>>>>>>>
> > >>>>>>>>> 2. input.localKeyBy(0).sum(1).keyBy(0).sum(1)
> > >>>>>>>>>
> > >>>>>>>>> *In this case, the semantic may have a problem, it would do
> > >> the
> > >>>>> local
> > >>>>>>> sum
> > >>>>>>>>> aggregation and will produce the latest partial result from
> > >> the
> > >>>>>> source
> > >>>>>>>>> start for every event. *
> > >>>>>>>>> *These latest partial results from the same key are hashed to
> > >>> one
> > >>>>>> node
> > >>>>>>> to
> > >>>>>>>>> do the global sum aggregation.*
> > >>>>>>>>> *In the global aggregation, when it received multiple partial
> > >>>>> results
> > >>>>>>>> (they
> > >>>>>>>>> are all calculated from the source start) and sum them will
> > >> get
> > >>>> the
> > >>>>>>> wrong
> > >>>>>>>>> result.*
> > >>>>>>>>>
> > >>>>>>>>> 3. input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)
> > >>>>>>>>>
> > >>>>>>>>> *In this case, it would just get a partial aggregation result
> > >>> for
> > >>>>>> the 5
> > >>>>>>>>> records in the count window. The partial aggregation results
> > >>> from
> > >>>>> the
> > >>>>>>>> same
> > >>>>>>>>> key will be aggregated globally.*
> > >>>>>>>>>
> > >>>>>>>>> So the first case and the third case can get the *same*
> > >> result,
> > >>>> the
> > >>>>>>>>> difference is the output-style and the latency.
> > >>>>>>>>>
> > >>>>>>>>> Generally speaking, the local key API is just an optimization
> > >>>> API.
> > >>>>> We
> > >>>>>>> do
> > >>>>>>>>> not limit the user's usage, but the user has to understand
> > >> its
> > >>>>>>> semantics
> > >>>>>>>>> and use it correctly.
> > >>>>>>>>>
> > >>>>>>>>> Best,
> > >>>>>>>>> Vino
> > >>>>>>>>>
> > >>>>>>>>> Hequn Cheng <chenghe...@gmail.com> 于2019年6月17日周一 下午4:18写道:
> > >>>>>>>>>
> > >>>>>>>>>> Hi Vino,
> > >>>>>>>>>>
> > >>>>>>>>>> Thanks for the proposal, I think it is a very good feature!
> > >>>>>>>>>>
> > >>>>>>>>>> One thing I want to make sure is the semantics for the
> > >>>>>> `localKeyBy`.
> > >>>>>>>> From
> > >>>>>>>>>> the document, the `localKeyBy` API returns an instance of
> > >>>>>>> `KeyedStream`
> > >>>>>>>>>> which can also perform sum(), so in this case, what's the
> > >>>>> semantics
> > >>>>>>> for
> > >>>>>>>>>> `localKeyBy()`. For example, will the following code share
> > >>> the
> > >>>>> same
> > >>>>>>>>> result?
> > >>>>>>>>>> and what're the differences between them?
> > >>>>>>>>>>
> > >>>>>>>>>> 1. input.keyBy(0).sum(1)
> > >>>>>>>>>> 2. input.localKeyBy(0).sum(1).keyBy(0).sum(1)
> > >>>>>>>>>> 3.
> > >> input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)
> > >>>>>>>>>>
> > >>>>>>>>>> Would also be great if we can add this into the document.
> > >>> Thank
> > >>>>> you
> > >>>>>>>> very
> > >>>>>>>>>> much.
> > >>>>>>>>>>
> > >>>>>>>>>> Best, Hequn
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> On Fri, Jun 14, 2019 at 11:34 AM vino yang <
> > >>>>> yanghua1...@gmail.com>
> > >>>>>>>>> wrote:
> > >>>>>>>>>>
> > >>>>>>>>>>> Hi Aljoscha,
> > >>>>>>>>>>>
> > >>>>>>>>>>> I have looked at the "*Process*" section of FLIP wiki
> > >>>> page.[1]
> > >>>>>> This
> > >>>>>>>>> mail
> > >>>>>>>>>>> thread indicates that it has proceeded to the third step.
> > >>>>>>>>>>>
> > >>>>>>>>>>> When I looked at the fourth step(vote step), I didn't
> > >> find
> > >>>> the
> > >>>>>>>>>>> prerequisites for starting the voting process.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Considering that the discussion of this feature has been
> > >>> done
> > >>>>> in
> > >>>>>>> the
> > >>>>>>>>> old
> > >>>>>>>>>>> thread. [2] So can you tell me when should I start
> > >> voting?
> > >>>> Can
> > >>>>> I
> > >>>>>>>> start
> > >>>>>>>>>> now?
> > >>>>>>>>>>>
> > >>>>>>>>>>> Best,
> > >>>>>>>>>>> Vino
> > >>>>>>>>>>>
> > >>>>>>>>>>> [1]:
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals#FlinkImprovementProposals-FLIPround-up
> > >>>>>>>>>>> [2]:
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308
> > >>>>>>>>>>>
> > >>>>>>>>>>> leesf <leesf0...@gmail.com> 于2019年6月13日周四 上午9:19写道:
> > >>>>>>>>>>>
> > >>>>>>>>>>>> +1 for the FLIP, thank vino for your efforts.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Best,
> > >>>>>>>>>>>> Leesf
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> vino yang <yanghua1...@gmail.com> 于2019年6月12日周三
> > >>> 下午5:46写道:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> Hi folks,
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> I would like to start the FLIP discussion thread
> > >> about
> > >>>>>>> supporting
> > >>>>>>>>>> local
> > >>>>>>>>>>>>> aggregation in Flink.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> In short, this feature can effectively alleviate data
> > >>>> skew.
> > >>>>>>> This
> > >>>>>>>> is
> > >>>>>>>>>> the
> > >>>>>>>>>>>>> FLIP:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-44%3A+Support+Local+Aggregation+in+Flink
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> *Motivation* (copied from FLIP)
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Currently, keyed streams are widely used to perform
> > >>>>>> aggregating
> > >>>>>>>>>>>> operations
> > >>>>>>>>>>>>> (e.g., reduce, sum and window) on the elements that
> > >>> have
> > >>>>> the
> > >>>>>>> same
> > >>>>>>>>>> key.
> > >>>>>>>>>>>> When
> > >>>>>>>>>>>>> executed at runtime, the elements with the same key
> > >>> will
> > >>>> be
> > >>>>>>> sent
> > >>>>>>>> to
> > >>>>>>>>>> and
> > >>>>>>>>>>>>> aggregated by the same task.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> The performance of these aggregating operations is
> > >> very
> > >>>>>>> sensitive
> > >>>>>>>>> to
> > >>>>>>>>>>> the
> > >>>>>>>>>>>>> distribution of keys. In the cases where the
> > >>> distribution
> > >>>>> of
> > >>>>>>> keys
> > >>>>>>>>>>>> follows a
> > >>>>>>>>>>>>> powerful law, the performance will be significantly
> > >>>>>> downgraded.
> > >>>>>>>>> More
> > >>>>>>>>>>>>> unluckily, increasing the degree of parallelism does
> > >>> not
> > >>>>> help
> > >>>>>>>> when
> > >>>>>>>>> a
> > >>>>>>>>>>> task
> > >>>>>>>>>>>>> is overloaded by a single key.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Local aggregation is a widely-adopted method to
> > >> reduce
> > >>>> the
> > >>>>>>>>>> performance
> > >>>>>>>>>>>>> degraded by data skew. We can decompose the
> > >> aggregating
> > >>>>>>>> operations
> > >>>>>>>>>> into
> > >>>>>>>>>>>> two
> > >>>>>>>>>>>>> phases. In the first phase, we aggregate the elements
> > >>> of
> > >>>>> the
> > >>>>>>> same
> > >>>>>>>>> key
> > >>>>>>>>>>> at
> > >>>>>>>>>>>>> the sender side to obtain partial results. Then at
> > >> the
> > >>>>> second
> > >>>>>>>>> phase,
> > >>>>>>>>>>>> these
> > >>>>>>>>>>>>> partial results are sent to receivers according to
> > >>> their
> > >>>>> keys
> > >>>>>>> and
> > >>>>>>>>> are
> > >>>>>>>>>>>>> combined to obtain the final result. Since the number
> > >>> of
> > >>>>>>> partial
> > >>>>>>>>>>> results
> > >>>>>>>>>>>>> received by each receiver is limited by the number of
> > >>>>>> senders,
> > >>>>>>>> the
> > >>>>>>>>>>>>> imbalance among receivers can be reduced. Besides, by
> > >>>>>> reducing
> > >>>>>>>> the
> > >>>>>>>>>>> amount
> > >>>>>>>>>>>>> of transferred data the performance can be further
> > >>>>> improved.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> *More details*:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Design documentation:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> >
> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Old discussion thread:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> JIRA: FLINK-12786 <
> > >>>>>>>>> https://issues.apache.org/jira/browse/FLINK-12786
> > >>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> We are looking forwards to your feedback!
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Best,
> > >>>>>>>>>>>>> Vino
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> >
> >
>

Reply via email to