Hi Kurt,
Sorry for the late statement. Yes, your statement basically reflects our
communication conclusions.
Yesterday, I am busy with another proposal and some other things.
Some part of the FLIP documentation has not been fully updated.
Let's listen to the further opinion after many developer
Hi all,
I have talked to vino offline try to have a better understanding about the
motivation and functionality
this FLIP he wanted to have. Seems there indeed exists some
misunderstandings for the previous
discussions. I will try to summarize the original scenario and requirements
from vino and s
Hi Jark,
> I will call them "local keyed state" because they have different
semantics with keyed state, even if "ListState", "MapState" are keyed state
primitives currently. From the point of my view, this exposes local keyed
state to users.
Actually, it depends on how to understand. From a certa
Hi Vino,
> The local keyed state we introduced is not exposed to the outside!
I have read your design and know the way how you implement local
aggregation via local keyed state, and how the local keyed state works.
Currently, Flink exposes two basic kinds of state: operator state and keyed
state.
Hi Jark,
*About local keyed state:*
I object to moving it out of this FLIP. It's one of the ways we support
Local aggregation on the implementation of operator level, though not the
only one.
I guess you have misunderstood my last reply. I just tell you the
difference between `DataStream#process
Hi Vino,
So the difference between `DataStream.localKeyBy().process()` with
`DataStream.process()` is that the former can access keyed state and the
latter can only access operator state.
I think it's out of the scope of designing a local aggregation API. It
might be an extension of state API, i.e
Hi all,
I also think it's a good idea that we need to agree on the API level first.
I am sorry, we did not give some usage examples of the API in the FLIP
documentation before. This may have caused some misunderstandings about the
discussion of this mail thread.
So, now I have added some usage e
Hi Jark,
`DataStream.localKeyBy().process()` has some key difference with
`DataStream.process()`. The former API receive `KeyedProcessFunction`
(sorry my previous reply may let you misunderstood), the latter receive API
receive `ProcessFunction`. When you read the java doc of ProcessFunction,
you
Hi Piotr,
I think the state migration you raised is a good point. Having
"stream.enableLocalAggregation(Trigger)” might add some implicit operators
which users can't set uid and cause the state compatibility/evolution
problems.
So let's put this in rejected alternatives.
Hi Vino,
You mentioned s
Hi Jark and Vino,
I agree fully with Jark, that in order to have the discussion focused and to
limit the number of parallel topics, we should first focus on one topic. We can
first decide on the API and later we can discuss the runtime details. At least
as long as we keep the potential requirem
Hi Jark,
Similar questions and responses have been repeated many times.
Why didn't we spend more sections discussing the API?
Because we try to reuse the ability of KeyedStream. The localKeyBy API just
returns the KeyedStream, that's our design, we can get all the benefit from
the KeyedStream an
Thanks for the long discussion Vino, Kurt, Hequn, Piotr and others,
It seems that we still have some different ideas about the API
(localKeyBy()?) and implementation details (reuse window operator? local
keyed state?).
And the discussion is stalled and mixed with motivation and API and
implementat
Hi Kurt,
Answer your questions:
a) Sorry, I just updated the Google doc, still have no time update the
FLIP, will update FLIP as soon as possible.
About your description at this point, I have a question, what does it mean:
how do we combine with
`AggregateFunction`?
I have shown you the examples
Hi Simon,
IMO, we do not need special processing for your example scenarios, Flink
suggests users extracting watermarks in source function.
Generally, the IDLE is temporary status, when the data coming, it will send
ACTIVE status to the downstream and the processing will continue.
Keep in mind,
Hi Vino
Thanks for your reply.
It seems feasible if a StreamStatus.IDLE was send to downstream, Still two
questions.
1. Do we need to add a method to allow users control when to send
StreamStatus.IDLE to downsteram in this case?
2. If a partial data comes after your IDLE status to downstr
Hi Simon,
Good question!
For event time semantics, we reuse the window operator can keep the correct
behavior which is the same as the current window operator. The window
operator will trigger based on the watermark.
About your example, the window of three partitions will trigger normally.
For t
Hi vino
Thanks for proposal.
For Local Aggregation I have a question about doing this in window aggregation.
As we know , window aggregation like sliding window should based on
Time trigger, and there may exists a problem in event time if we do local
aggregation. For example if I want to do a
Hi vino,
One thing to add, for a), I think use one or two examples like how to do
local aggregation on a sliding window,
and how do we do local aggregation on an unbounded aggregate, will do a lot
help.
Best,
Kurt
On Mon, Jun 24, 2019 at 6:06 PM Kurt Young wrote:
> Hi vino,
>
> I think there
Hi vino,
I think there are several things still need discussion.
a) We all agree that we should first go with a unified abstraction, but the
abstraction is not reflected by the FLIP.
If your answer is "locakKeyBy" API, then I would ask how do we combine with
`AggregateFunction`, and how do
we do
Hi Kurt,
You did not give more further different opinions, so I thought you have
agreed with the design after we promised to support two kinds of
implementation.
In API level, we have answered your question about pass an
AggregateFunction to do the aggregation. No matter introduce localKeyBy API
Hi vino,
Sorry I don't see the consensus about reusing window operator and keep the
API design of localKeyBy. But I think we should definitely more thoughts
about this topic.
I also try to loop in Stephan for this discussion.
Best,
Kurt
On Mon, Jun 24, 2019 at 3:26 PM vino yang wrote:
> Hi a
Hi all,
I am happy we have a wonderful discussion and received many valuable
opinions in the last few days.
Now, let me try to summarize what we have reached consensus about the
changes in the design.
- provide a unified abstraction to support two kinds of implementation;
- reuse WindowOpe
Hi Kurt,
Thanks for your comments.
It seems we come to a consensus that we should alleviate the performance
degraded by data skew with local aggregation. In this FLIP, our key
solution is to introduce local keyed partition to achieve this goal.
I also agree that we can benefit a lot from the usa
Hi all,
As vino said in previous emails, I think we should first discuss and decide
what kind of use cases this FLIP want to
resolve, and what the API should look like. From my side, I think this is
probably the root cause of current divergence.
My understand is (from the FLIP title and motivatio
Hi Hequn,
Thanks for your comments!
I agree that allowing local aggregation reusing window API and refining
window operator to make it match both requirements (come from our and Kurt)
is a good decision!
Concerning your questions:
1. The result of input.localKeyBy(0).sum(1).keyBy(0).sum(1) may
Hi,
Thanks a lot for your great discussion and great to see that some agreement
has been reached on the "local aggregate engine"!
===> Considering the abstract engine,
I'm thinking is it valuable for us to extend the current window to meet
both demands raised by Kurt and Vino? There are some bene
Hi Kurt and Piotrek,
Thanks for your comments.
I agree that we can provide a better abstraction to be compatible with two
different implementations.
First of all, I think we should consider what kind of scenarios we need to
support in *API* level?
We have some use cases which need to a customiz
Hi Piotr,
Thanks for joining the discussion. Make “local aggregation" abstract enough
sounds good to me, we could
implement and verify alternative solutions for use cases of local
aggregation. Maybe we will find both solutions
are appropriate for different scenarios.
Starting from a simple one so
Hi Kurt and Vino,
I think there is a trade of hat we need to consider for the local aggregation.
Generally speaking I would agree with Kurt about local aggregation/pre
aggregation not using Flink's state flush the operator on a checkpoint. Network
IO is usually cheaper compared to Disks IO. Thi
Hi Kurt,
Thanks for your reply.
If you do not depend on the window operator, that means you need to provide
many Trigger related implementations like window operator.
What's more, you worry about the complexity of the window operator but
ignore the flexible which window operator provided for the
Hi,
For the trigger, it depends on what operator we want to use under the API.
If we choose to use window operator,
we should also use window's trigger. However, I also think reuse window
operator for this scenario may not be
the best choice. The reasons are the following:
1. As a lot of people a
Hi Kurt,
Thanks for your example. Now, it looks more clearly for me.
>From your example code snippet, I saw the localAggregate API has three
parameters:
1. key field
2. PartitionAvg
3. CountTrigger: Does this trigger comes from window package?
I will compare our and your design from AP
Yeah, sorry for not expressing myself clearly. I will try to provide more
details to make sure we are on the same page.
For DataStream API, it shouldn't be optimized automatically. You have to
explicitly call API to do local aggregation
as well as the trigger policy of the local aggregation. Take
Hi Kurt,
Thanks for your reply.
Actually, I am not against you to raise your design.
>From your description before, I just can imagine your high-level
implementation is about SQL and the optimization is inner of the API. Is it
automatically? how to give the configuration option about trigger
pre
Hi Vino,
Now I feel that we may have different understandings about what kind of
problems or improvements you want to
resolve. Currently, most of the feedback are focusing on *how to do a
proper local aggregation to improve performance
and maybe solving the data skew issue*. And my gut feeling is
Hi Kurt,
Thanks for your comments.
It seems we both implemented local aggregation feature to optimize the
issue of data skew.
However, IMHO, the API level of optimizing revenue is different.
*Your optimization benefits from Flink SQL and it's not user's faces.(If I
understand it incorrectly, ple
Hi Jark,
We have done a comparative test. The effect is obvious.
>From our observation, the optimized effect mainly depends on two factors:
- the degree of the skew: this factor depends on users business ;
- the size of the window: localKeyBy support all the type of window
which provid
Hi Vino,
Thanks for the proposal, I like the general idea and IMO it's very useful
feature.
But after reading through the document, I feel that we may over design the
required
operator for proper local aggregation. The main reason is we want to have a
clear definition and behavior about the "local
Hi Vino,
Thanks for the proposal.
Regarding to the "input.keyBy(0).sum(1)" vs
"input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)", have you done
some benchmark?
Because I'm curious about how much performance improvement can we get by
using count window as the local operator.
Best,
Jark
Hi Hequn,
Thanks for your reply.
The purpose of localKeyBy API is to provide a tool which can let users do
pre-aggregation in the local. The behavior of the pre-aggregation is
similar to keyBy API.
So the three cases are different, I will describe them one by one:
1. input.keyBy(0).sum(1)
*In
Hi Vino,
Thanks for the proposal, I think it is a very good feature!
One thing I want to make sure is the semantics for the `localKeyBy`. From
the document, the `localKeyBy` API returns an instance of `KeyedStream`
which can also perform sum(), so in this case, what's the semantics for
`localKeyB
Hi Aljoscha,
I have looked at the "*Process*" section of FLIP wiki page.[1] This mail
thread indicates that it has proceeded to the third step.
When I looked at the fourth step(vote step), I didn't find the
prerequisites for starting the voting process.
Considering that the discussion of this fe
+1 for the FLIP, thank vino for your efforts.
Best,
Leesf
vino yang 于2019年6月12日周三 下午5:46写道:
> Hi folks,
>
> I would like to start the FLIP discussion thread about supporting local
> aggregation in Flink.
>
> In short, this feature can effectively alleviate data skew. This is the
> FLIP:
>
>
> h
Hi folks,
I would like to start the FLIP discussion thread about supporting local
aggregation in Flink.
In short, this feature can effectively alleviate data skew. This is the
FLIP:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-44%3A+Support+Local+Aggregation+in+Flink
*Motivation* (co
44 matches
Mail list logo