Yeah, sorry for not expressing myself clearly. I will try to provide more
details to make sure we are on the same page.

For DataStream API, it shouldn't be optimized automatically. You have to
explicitly call API to do local aggregation
as well as the trigger policy of the local aggregation. Take average for
example, the user program may look like this (just a draft):

assuming the input type is DataStream<Tupl2<String, Int>>

ds.localAggregate(
        0,                                       // The local key, which is
the String from Tuple2
        PartitionAvg(1),                 // The partial aggregation
function, produces Tuple2<Long, Int>, indicating sum and count
        CountTrigger.of(1000L)    // Trigger policy, note this should be
best effort, and also be composited with time based or memory size based
trigger
    )                                           // The return type is local
aggregate Tuple2<String, Tupl2<Long, Int>>
    .keyBy(0)                             // Further keyby it with required
key
    .aggregate(1)                      // This will merge all the partial
results and get the final average.

(This is only a draft, only trying to explain what it looks like. )

The local aggregate operator can be stateless, we can keep a memory buffer
or other efficient data structure to improve the aggregate performance.

Let me know if you have any other questions.

Best,
Kurt


On Tue, Jun 18, 2019 at 1:29 PM vino yang <yanghua1...@gmail.com> wrote:

> Hi Kurt,
>
> Thanks for your reply.
>
> Actually, I am not against you to raise your design.
>
> From your description before, I just can imagine your high-level
> implementation is about SQL and the optimization is inner of the API. Is it
> automatically? how to give the configuration option about trigger
> pre-aggregation?
>
> Maybe after I get more information, it sounds more reasonable.
>
> IMO, first of all, it would be better to make your user interface concrete,
> it's the basis of the discussion.
>
> For example, can you give an example code snippet to introduce how to help
> users to process data skew caused by the jobs which built with DataStream
> API?
>
> If you give more details we can discuss further more. I think if one design
> introduces an exact interface and another does not.
>
> The implementation has an obvious difference. For example, we introduce an
> exact API in DataStream named localKeyBy, about the pre-aggregation we need
> to define the trigger mechanism of local aggregation, so we find reused
> window API and operator is a good choice. This is a reasoning link from
> design to implementation.
>
> What do you think?
>
> Best,
> Vino
>
>
> Kurt Young <ykt...@gmail.com> 于2019年6月18日周二 上午11:58写道:
>
> > Hi Vino,
> >
> > Now I feel that we may have different understandings about what kind of
> > problems or improvements you want to
> > resolve. Currently, most of the feedback are focusing on *how to do a
> > proper local aggregation to improve performance
> > and maybe solving the data skew issue*. And my gut feeling is this is
> > exactly what users want at the first place,
> > especially those +1s. (Sorry to try to summarize here, please correct me
> if
> > i'm wrong).
> >
> > But I still think the design is somehow diverged from the goal. If we
> want
> > to have an efficient and powerful way to
> > have local aggregation, supporting intermedia result type is essential
> IMO.
> > Both runtime's `AggregateFunction` and
> > SQL`s `UserDefinedAggregateFunction` have a proper support of
> intermediate
> > result type and can do `merge` operation
> > on them.
> >
> > Now, we have a lightweight alternatives which performs well, and have a
> > nice fit with the local aggregate requirements.
> > Mostly importantly,  it's much less complex because it's stateless. And
> it
> > can also achieve the similar multiple-aggregation
> > scenario.
> >
> > I still not convinced why we shouldn't consider it as a first step.
> >
> > Best,
> > Kurt
> >
> >
> > On Tue, Jun 18, 2019 at 11:35 AM vino yang <yanghua1...@gmail.com>
> wrote:
> >
> > > Hi Kurt,
> > >
> > > Thanks for your comments.
> > >
> > > It seems we both implemented local aggregation feature to optimize the
> > > issue of data skew.
> > > However, IMHO, the API level of optimizing revenue is different.
> > >
> > > *Your optimization benefits from Flink SQL and it's not user's
> faces.(If
> > I
> > > understand it incorrectly, please correct this.)*
> > > *Our implementation employs it as an optimization tool API for
> > DataStream,
> > > it just like a local version of the keyBy API.*
> > >
> > > Based on this, I want to say support it as a DataStream API can provide
> > > these advantages:
> > >
> > >
> > >    - The localKeyBy API has a clear semantic and it's flexible not only
> > for
> > >    processing data skew but also for implementing some user cases, for
> > >    example, if we want to calculate the multiple-level aggregation, we
> > can
> > > do
> > >    multiple-level aggregation in the local aggregation:
> > >    input.localKeyBy("a").sum(1).localKeyBy("b").window(); // here "a"
> is
> > a
> > >    sub-category, while "b" is a category, here we do not need to
> shuffle
> > > data
> > >    in the network.
> > >    - The users of DataStream API will benefit from this. Actually, we
> > have
> > >    a lot of scenes need to use DataStream API. Currently, DataStream
> API
> > is
> > >    the cornerstone of the physical plan of Flink SQL. With a localKeyBy
> > > API,
> > >    the optimization of SQL at least may use this optimized API, this
> is a
> > >    further topic.
> > >    - Based on the window operator, our state would benefit from Flink
> > State
> > >    and checkpoint, we do not need to worry about OOM and job failed.
> > >
> > > Now, about your questions:
> > >
> > > 1. About our design cannot change the data type and about the
> > > implementation of average:
> > >
> > > Just like my reply to Hequn, the localKeyBy is an API provides to the
> > users
> > > who use DataStream API to build their jobs.
> > > Users should know its semantics and the difference with keyBy API, so
> if
> > > they want to the average aggregation, they should carry local sum
> result
> > > and local count result.
> > > I admit that it will be convenient to use keyBy directly. But we need
> to
> > > pay a little price when we get some benefits. I think this price is
> > > reasonable. Considering that the DataStream API itself is a low-level
> API
> > > (at least for now).
> > >
> > > 2. About stateless operator and
> > > `StreamOperator::prepareSnapshotPreBarrier()`:
> > >
> > > Actually, I have discussed this opinion with @dianfu in the old mail
> > > thread. I will copy my opinion from there:
> > >
> > >    - for your design, you still need somewhere to give the users
> > configure
> > >    the trigger threshold (maybe memory availability?), this design
> cannot
> > >    guarantee a deterministic semantics (it will bring trouble for
> testing
> > > and
> > >    debugging).
> > >    - if the implementation depends on the timing of checkpoint, it
> would
> > >    affect the checkpoint's progress, and the buffered data may cause
> OOM
> > >    issue. In addition, if the operator is stateless, it can not provide
> > > fault
> > >    tolerance.
> > >
> > > Best,
> > > Vino
> > >
> > > Kurt Young <ykt...@gmail.com> 于2019年6月18日周二 上午9:22写道:
> > >
> > > > Hi Vino,
> > > >
> > > > Thanks for the proposal, I like the general idea and IMO it's very
> > useful
> > > > feature.
> > > > But after reading through the document, I feel that we may over
> design
> > > the
> > > > required
> > > > operator for proper local aggregation. The main reason is we want to
> > > have a
> > > > clear definition and behavior about the "local keyed state" which in
> my
> > > > opinion is not
> > > > necessary for local aggregation, at least for start.
> > > >
> > > > Another issue I noticed is the local key by operator cannot change
> > > element
> > > > type, it will
> > > > also restrict a lot of use cases which can be benefit from local
> > > > aggregation, like "average".
> > > >
> > > > We also did similar logic in SQL and the only thing need to be done
> is
> > > > introduce
> > > > a stateless lightweight operator which is *chained* before `keyby()`.
> > The
> > > > operator will flush all buffered
> > > > elements during `StreamOperator::prepareSnapshotPreBarrier()` and
> make
> > > > himself stateless.
> > > > By the way, in the earlier version we also did the similar approach
> by
> > > > introducing a stateful
> > > > local aggregation operator but it's not performed as well as the
> later
> > > one,
> > > > and also effect the barrie
> > > > alignment time. The later one is fairly simple and more efficient.
> > > >
> > > > I would highly suggest you to consider to have a stateless approach
> at
> > > the
> > > > first step.
> > > >
> > > > Best,
> > > > Kurt
> > > >
> > > >
> > > > On Mon, Jun 17, 2019 at 7:32 PM Jark Wu <imj...@gmail.com> wrote:
> > > >
> > > > > Hi Vino,
> > > > >
> > > > > Thanks for the proposal.
> > > > >
> > > > > Regarding to the "input.keyBy(0).sum(1)" vs
> > > > > "input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)", have
> you
> > > > done
> > > > > some benchmark?
> > > > > Because I'm curious about how much performance improvement can we
> get
> > > by
> > > > > using count window as the local operator.
> > > > >
> > > > > Best,
> > > > > Jark
> > > > >
> > > > >
> > > > >
> > > > > On Mon, 17 Jun 2019 at 17:48, vino yang <yanghua1...@gmail.com>
> > wrote:
> > > > >
> > > > > > Hi Hequn,
> > > > > >
> > > > > > Thanks for your reply.
> > > > > >
> > > > > > The purpose of localKeyBy API is to provide a tool which can let
> > > users
> > > > do
> > > > > > pre-aggregation in the local. The behavior of the pre-aggregation
> > is
> > > > > > similar to keyBy API.
> > > > > >
> > > > > > So the three cases are different, I will describe them one by
> one:
> > > > > >
> > > > > > 1. input.keyBy(0).sum(1)
> > > > > >
> > > > > > *In this case, the result is event-driven, each event can produce
> > one
> > > > sum
> > > > > > aggregation result and it is the latest one from the source
> start.*
> > > > > >
> > > > > > 2. input.localKeyBy(0).sum(1).keyBy(0).sum(1)
> > > > > >
> > > > > > *In this case, the semantic may have a problem, it would do the
> > local
> > > > sum
> > > > > > aggregation and will produce the latest partial result from the
> > > source
> > > > > > start for every event. *
> > > > > > *These latest partial results from the same key are hashed to one
> > > node
> > > > to
> > > > > > do the global sum aggregation.*
> > > > > > *In the global aggregation, when it received multiple partial
> > results
> > > > > (they
> > > > > > are all calculated from the source start) and sum them will get
> the
> > > > wrong
> > > > > > result.*
> > > > > >
> > > > > > 3. input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)
> > > > > >
> > > > > > *In this case, it would just get a partial aggregation result for
> > > the 5
> > > > > > records in the count window. The partial aggregation results from
> > the
> > > > > same
> > > > > > key will be aggregated globally.*
> > > > > >
> > > > > > So the first case and the third case can get the *same* result,
> the
> > > > > > difference is the output-style and the latency.
> > > > > >
> > > > > > Generally speaking, the local key API is just an optimization
> API.
> > We
> > > > do
> > > > > > not limit the user's usage, but the user has to understand its
> > > > semantics
> > > > > > and use it correctly.
> > > > > >
> > > > > > Best,
> > > > > > Vino
> > > > > >
> > > > > > Hequn Cheng <chenghe...@gmail.com> 于2019年6月17日周一 下午4:18写道:
> > > > > >
> > > > > > > Hi Vino,
> > > > > > >
> > > > > > > Thanks for the proposal, I think it is a very good feature!
> > > > > > >
> > > > > > > One thing I want to make sure is the semantics for the
> > > `localKeyBy`.
> > > > > From
> > > > > > > the document, the `localKeyBy` API returns an instance of
> > > > `KeyedStream`
> > > > > > > which can also perform sum(), so in this case, what's the
> > semantics
> > > > for
> > > > > > > `localKeyBy()`. For example, will the following code share the
> > same
> > > > > > result?
> > > > > > > and what're the differences between them?
> > > > > > >
> > > > > > > 1. input.keyBy(0).sum(1)
> > > > > > > 2. input.localKeyBy(0).sum(1).keyBy(0).sum(1)
> > > > > > > 3. input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)
> > > > > > >
> > > > > > > Would also be great if we can add this into the document. Thank
> > you
> > > > > very
> > > > > > > much.
> > > > > > >
> > > > > > > Best, Hequn
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Jun 14, 2019 at 11:34 AM vino yang <
> > yanghua1...@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Aljoscha,
> > > > > > > >
> > > > > > > > I have looked at the "*Process*" section of FLIP wiki
> page.[1]
> > > This
> > > > > > mail
> > > > > > > > thread indicates that it has proceeded to the third step.
> > > > > > > >
> > > > > > > > When I looked at the fourth step(vote step), I didn't find
> the
> > > > > > > > prerequisites for starting the voting process.
> > > > > > > >
> > > > > > > > Considering that the discussion of this feature has been done
> > in
> > > > the
> > > > > > old
> > > > > > > > thread. [2] So can you tell me when should I start voting?
> Can
> > I
> > > > > start
> > > > > > > now?
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Vino
> > > > > > > >
> > > > > > > > [1]:
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals#FlinkImprovementProposals-FLIPround-up
> > > > > > > > [2]:
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308
> > > > > > > >
> > > > > > > > leesf <leesf0...@gmail.com> 于2019年6月13日周四 上午9:19写道:
> > > > > > > >
> > > > > > > > > +1 for the FLIP, thank vino for your efforts.
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Leesf
> > > > > > > > >
> > > > > > > > > vino yang <yanghua1...@gmail.com> 于2019年6月12日周三 下午5:46写道:
> > > > > > > > >
> > > > > > > > > > Hi folks,
> > > > > > > > > >
> > > > > > > > > > I would like to start the FLIP discussion thread about
> > > > supporting
> > > > > > > local
> > > > > > > > > > aggregation in Flink.
> > > > > > > > > >
> > > > > > > > > > In short, this feature can effectively alleviate data
> skew.
> > > > This
> > > > > is
> > > > > > > the
> > > > > > > > > > FLIP:
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-44%3A+Support+Local+Aggregation+in+Flink
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > *Motivation* (copied from FLIP)
> > > > > > > > > >
> > > > > > > > > > Currently, keyed streams are widely used to perform
> > > aggregating
> > > > > > > > > operations
> > > > > > > > > > (e.g., reduce, sum and window) on the elements that have
> > the
> > > > same
> > > > > > > key.
> > > > > > > > > When
> > > > > > > > > > executed at runtime, the elements with the same key will
> be
> > > > sent
> > > > > to
> > > > > > > and
> > > > > > > > > > aggregated by the same task.
> > > > > > > > > >
> > > > > > > > > > The performance of these aggregating operations is very
> > > > sensitive
> > > > > > to
> > > > > > > > the
> > > > > > > > > > distribution of keys. In the cases where the distribution
> > of
> > > > keys
> > > > > > > > > follows a
> > > > > > > > > > powerful law, the performance will be significantly
> > > downgraded.
> > > > > > More
> > > > > > > > > > unluckily, increasing the degree of parallelism does not
> > help
> > > > > when
> > > > > > a
> > > > > > > > task
> > > > > > > > > > is overloaded by a single key.
> > > > > > > > > >
> > > > > > > > > > Local aggregation is a widely-adopted method to reduce
> the
> > > > > > > performance
> > > > > > > > > > degraded by data skew. We can decompose the aggregating
> > > > > operations
> > > > > > > into
> > > > > > > > > two
> > > > > > > > > > phases. In the first phase, we aggregate the elements of
> > the
> > > > same
> > > > > > key
> > > > > > > > at
> > > > > > > > > > the sender side to obtain partial results. Then at the
> > second
> > > > > > phase,
> > > > > > > > > these
> > > > > > > > > > partial results are sent to receivers according to their
> > keys
> > > > and
> > > > > > are
> > > > > > > > > > combined to obtain the final result. Since the number of
> > > > partial
> > > > > > > > results
> > > > > > > > > > received by each receiver is limited by the number of
> > > senders,
> > > > > the
> > > > > > > > > > imbalance among receivers can be reduced. Besides, by
> > > reducing
> > > > > the
> > > > > > > > amount
> > > > > > > > > > of transferred data the performance can be further
> > improved.
> > > > > > > > > >
> > > > > > > > > > *More details*:
> > > > > > > > > >
> > > > > > > > > > Design documentation:
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing
> > > > > > > > > >
> > > > > > > > > > Old discussion thread:
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308
> > > > > > > > > >
> > > > > > > > > > JIRA: FLINK-12786 <
> > > > > > https://issues.apache.org/jira/browse/FLINK-12786
> > > > > > > >
> > > > > > > > > >
> > > > > > > > > > We are looking forwards to your feedback!
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Vino
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to