Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-07-03 Thread vino yang
Hi Kurt, Sorry for the late statement. Yes, your statement basically reflects our communication conclusions. Yesterday, I am busy with another proposal and some other things. Some part of the FLIP documentation has not been fully updated. Let's listen to the further opinion after many developer

Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-07-03 Thread Kurt Young
Hi all, I have talked to vino offline try to have a better understanding about the motivation and functionality this FLIP he wanted to have. Seems there indeed exists some misunderstandings for the previous discussions. I will try to summarize the original scenario and requirements from vino and s

Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-30 Thread vino yang
Hi Jark, > I will call them "local keyed state" because they have different semantics with keyed state, even if "ListState", "MapState" are keyed state primitives currently. From the point of my view, this exposes local keyed state to users. Actually, it depends on how to understand. From a certa

Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-30 Thread Jark Wu
Hi Vino, > The local keyed state we introduced is not exposed to the outside! I have read your design and know the way how you implement local aggregation via local keyed state, and how the local keyed state works. Currently, Flink exposes two basic kinds of state: operator state and keyed state.

Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-27 Thread vino yang
Hi Jark, *About local keyed state:* I object to moving it out of this FLIP. It's one of the ways we support Local aggregation on the implementation of operator level, though not the only one. I guess you have misunderstood my last reply. I just tell you the difference between `DataStream#process

Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-27 Thread Jark Wu
Hi Vino, So the difference between `DataStream.localKeyBy().process()` with `DataStream.process()` is that the former can access keyed state and the latter can only access operator state. I think it's out of the scope of designing a local aggregation API. It might be an extension of state API, i.e

Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-26 Thread vino yang
Hi all, I also think it's a good idea that we need to agree on the API level first. I am sorry, we did not give some usage examples of the API in the FLIP documentation before. This may have caused some misunderstandings about the discussion of this mail thread. So, now I have added some usage e

Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-26 Thread vino yang
Hi Jark, `DataStream.localKeyBy().process()` has some key difference with `DataStream.process()`. The former API receive `KeyedProcessFunction` (sorry my previous reply may let you misunderstood), the latter receive API receive `ProcessFunction`. When you read the java doc of ProcessFunction, you

Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-26 Thread Jark Wu
Hi Piotr, I think the state migration you raised is a good point. Having "stream.enableLocalAggregation(Trigger)” might add some implicit operators which users can't set uid and cause the state compatibility/evolution problems. So let's put this in rejected alternatives. Hi Vino, You mentioned s

Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-26 Thread Piotr Nowojski
Hi Jark and Vino, I agree fully with Jark, that in order to have the discussion focused and to limit the number of parallel topics, we should first focus on one topic. We can first decide on the API and later we can discuss the runtime details. At least as long as we keep the potential requirem

Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-26 Thread vino yang
Hi Jark, Similar questions and responses have been repeated many times. Why didn't we spend more sections discussing the API? Because we try to reuse the ability of KeyedStream. The localKeyBy API just returns the KeyedStream, that's our design, we can get all the benefit from the KeyedStream an

Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-25 Thread Jark Wu
Thanks for the long discussion Vino, Kurt, Hequn, Piotr and others, It seems that we still have some different ideas about the API (localKeyBy()?) and implementation details (reuse window operator? local keyed state?). And the discussion is stalled and mixed with motivation and API and implementat

Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-24 Thread vino yang
Hi Kurt, Answer your questions: a) Sorry, I just updated the Google doc, still have no time update the FLIP, will update FLIP as soon as possible. About your description at this point, I have a question, what does it mean: how do we combine with `AggregateFunction`? I have shown you the examples

Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-24 Thread vino yang
Hi Simon, IMO, we do not need special processing for your example scenarios, Flink suggests users extracting watermarks in source function. Generally, the IDLE is temporary status, when the data coming, it will send ACTIVE status to the downstream and the processing will continue. Keep in mind,

Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-24 Thread Shu Su
Hi Vino Thanks for your reply. It seems feasible if a StreamStatus.IDLE was send to downstream, Still two questions. 1. Do we need to add a method to allow users control when to send StreamStatus.IDLE to downsteram in this case? 2. If a partial data comes after your IDLE status to downstr

Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-24 Thread vino yang
Hi Simon, Good question! For event time semantics, we reuse the window operator can keep the correct behavior which is the same as the current window operator. The window operator will trigger based on the watermark. About your example, the window of three partitions will trigger normally. For t

Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-24 Thread Shu Su
Hi vino Thanks for proposal. For Local Aggregation I have a question about doing this in window aggregation. As we know , window aggregation like sliding window should based on Time trigger, and there may exists a problem in event time if we do local aggregation. For example if I want to do a

Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-24 Thread Kurt Young
Hi vino, One thing to add, for a), I think use one or two examples like how to do local aggregation on a sliding window, and how do we do local aggregation on an unbounded aggregate, will do a lot help. Best, Kurt On Mon, Jun 24, 2019 at 6:06 PM Kurt Young wrote: > Hi vino, > > I think there

Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-24 Thread Kurt Young
Hi vino, I think there are several things still need discussion. a) We all agree that we should first go with a unified abstraction, but the abstraction is not reflected by the FLIP. If your answer is "locakKeyBy" API, then I would ask how do we combine with `AggregateFunction`, and how do we do

Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-24 Thread vino yang
Hi Kurt, You did not give more further different opinions, so I thought you have agreed with the design after we promised to support two kinds of implementation. In API level, we have answered your question about pass an AggregateFunction to do the aggregation. No matter introduce localKeyBy API

Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-24 Thread Kurt Young
Hi vino, Sorry I don't see the consensus about reusing window operator and keep the API design of localKeyBy. But I think we should definitely more thoughts about this topic. I also try to loop in Stephan for this discussion. Best, Kurt On Mon, Jun 24, 2019 at 3:26 PM vino yang wrote: > Hi a

Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-24 Thread vino yang
Hi all, I am happy we have a wonderful discussion and received many valuable opinions in the last few days. Now, let me try to summarize what we have reached consensus about the changes in the design. - provide a unified abstraction to support two kinds of implementation; - reuse WindowOpe

Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-19 Thread vino yang
Hi Kurt, Thanks for your comments. It seems we come to a consensus that we should alleviate the performance degraded by data skew with local aggregation. In this FLIP, our key solution is to introduce local keyed partition to achieve this goal. I also agree that we can benefit a lot from the usa

Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-19 Thread Kurt Young
Hi all, As vino said in previous emails, I think we should first discuss and decide what kind of use cases this FLIP want to resolve, and what the API should look like. From my side, I think this is probably the root cause of current divergence. My understand is (from the FLIP title and motivatio

Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-19 Thread vino yang
Hi Hequn, Thanks for your comments! I agree that allowing local aggregation reusing window API and refining window operator to make it match both requirements (come from our and Kurt) is a good decision! Concerning your questions: 1. The result of input.localKeyBy(0).sum(1).keyBy(0).sum(1) may

Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-19 Thread Hequn Cheng
Hi, Thanks a lot for your great discussion and great to see that some agreement has been reached on the "local aggregate engine"! ===> Considering the abstract engine, I'm thinking is it valuable for us to extend the current window to meet both demands raised by Kurt and Vino? There are some bene

Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-18 Thread vino yang
Hi Kurt and Piotrek, Thanks for your comments. I agree that we can provide a better abstraction to be compatible with two different implementations. First of all, I think we should consider what kind of scenarios we need to support in *API* level? We have some use cases which need to a customiz

Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-18 Thread Kurt Young
Hi Piotr, Thanks for joining the discussion. Make “local aggregation" abstract enough sounds good to me, we could implement and verify alternative solutions for use cases of local aggregation. Maybe we will find both solutions are appropriate for different scenarios. Starting from a simple one so

Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-18 Thread Piotr Nowojski
Hi Kurt and Vino, I think there is a trade of hat we need to consider for the local aggregation. Generally speaking I would agree with Kurt about local aggregation/pre aggregation not using Flink's state flush the operator on a checkpoint. Network IO is usually cheaper compared to Disks IO. Thi

Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-18 Thread vino yang
Hi Kurt, Thanks for your reply. If you do not depend on the window operator, that means you need to provide many Trigger related implementations like window operator. What's more, you worry about the complexity of the window operator but ignore the flexible which window operator provided for the

Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-18 Thread Kurt Young
Hi, For the trigger, it depends on what operator we want to use under the API. If we choose to use window operator, we should also use window's trigger. However, I also think reuse window operator for this scenario may not be the best choice. The reasons are the following: 1. As a lot of people a

Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-18 Thread vino yang
Hi Kurt, Thanks for your example. Now, it looks more clearly for me. >From your example code snippet, I saw the localAggregate API has three parameters: 1. key field 2. PartitionAvg 3. CountTrigger: Does this trigger comes from window package? I will compare our and your design from AP

Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-17 Thread Kurt Young
Yeah, sorry for not expressing myself clearly. I will try to provide more details to make sure we are on the same page. For DataStream API, it shouldn't be optimized automatically. You have to explicitly call API to do local aggregation as well as the trigger policy of the local aggregation. Take

Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-17 Thread vino yang
Hi Kurt, Thanks for your reply. Actually, I am not against you to raise your design. >From your description before, I just can imagine your high-level implementation is about SQL and the optimization is inner of the API. Is it automatically? how to give the configuration option about trigger pre

Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-17 Thread Kurt Young
Hi Vino, Now I feel that we may have different understandings about what kind of problems or improvements you want to resolve. Currently, most of the feedback are focusing on *how to do a proper local aggregation to improve performance and maybe solving the data skew issue*. And my gut feeling is

Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-17 Thread vino yang
Hi Kurt, Thanks for your comments. It seems we both implemented local aggregation feature to optimize the issue of data skew. However, IMHO, the API level of optimizing revenue is different. *Your optimization benefits from Flink SQL and it's not user's faces.(If I understand it incorrectly, ple

Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-17 Thread vino yang
Hi Jark, We have done a comparative test. The effect is obvious. >From our observation, the optimized effect mainly depends on two factors: - the degree of the skew: this factor depends on users business ; - the size of the window: localKeyBy support all the type of window which provid

Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-17 Thread Kurt Young
Hi Vino, Thanks for the proposal, I like the general idea and IMO it's very useful feature. But after reading through the document, I feel that we may over design the required operator for proper local aggregation. The main reason is we want to have a clear definition and behavior about the "local

Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-17 Thread Jark Wu
Hi Vino, Thanks for the proposal. Regarding to the "input.keyBy(0).sum(1)" vs "input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)", have you done some benchmark? Because I'm curious about how much performance improvement can we get by using count window as the local operator. Best, Jark

Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-17 Thread vino yang
Hi Hequn, Thanks for your reply. The purpose of localKeyBy API is to provide a tool which can let users do pre-aggregation in the local. The behavior of the pre-aggregation is similar to keyBy API. So the three cases are different, I will describe them one by one: 1. input.keyBy(0).sum(1) *In

Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-17 Thread Hequn Cheng
Hi Vino, Thanks for the proposal, I think it is a very good feature! One thing I want to make sure is the semantics for the `localKeyBy`. From the document, the `localKeyBy` API returns an instance of `KeyedStream` which can also perform sum(), so in this case, what's the semantics for `localKeyB

Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-13 Thread vino yang
Hi Aljoscha, I have looked at the "*Process*" section of FLIP wiki page.[1] This mail thread indicates that it has proceeded to the third step. When I looked at the fourth step(vote step), I didn't find the prerequisites for starting the voting process. Considering that the discussion of this fe

Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-12 Thread leesf
+1 for the FLIP, thank vino for your efforts. Best, Leesf vino yang 于2019年6月12日周三 下午5:46写道: > Hi folks, > > I would like to start the FLIP discussion thread about supporting local > aggregation in Flink. > > In short, this feature can effectively alleviate data skew. This is the > FLIP: > > > h

[DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-12 Thread vino yang
Hi folks, I would like to start the FLIP discussion thread about supporting local aggregation in Flink. In short, this feature can effectively alleviate data skew. This is the FLIP: https://cwiki.apache.org/confluence/display/FLINK/FLIP-44%3A+Support+Local+Aggregation+in+Flink *Motivation* (co