Yes Fabian, I will complete my design with more thorough thoughts. BTW, I think the incremental aggregate (the key point I suggested is to eliminate state per each window) I proposed should work for both processing time and event time. It just does not need a sorted state for the processing time scenarios. (Need to verify).
Regards, Shaoxuan On Wed, Jan 25, 2017 at 5:55 PM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi everybody, > > thanks for the great discussions so far. It's awesome to see so much > interest in this topic! > > First, I'd like to comment on the development process for this feature and > later on the design of the runtime: > > Dev Process > ---- > @Shaoxuan, I completely agree with you. We should first come up with good > designs for the runtime operators of the different window types. Once we > have that, we can start implementing the operators and integrate them with > Calcite's optimization. This will be an intermediate step and as a > byproduct give us support for SQL OVER windows. Once this is done, we can > extend the Table API and translate the Table API calls into the same > RelNodes as Calcite's SQL parser does. > > Runtime Design > ---- > I think it makes sense to distinguish the different types of OVER windows > because they have different requirements which result in different runtime > implementations (with different implementation complexity and performance). > In a previous mail I proposed to split the support for OVER windows into > the following subtasks: > > # bounded PRECEDING > - OVER ROWS for processing time > - does not require sorted state (data always arrives in processing time > order) > - no need to consider retraction (processing time is never late) > - defines windows on row count. > - A GlobalWindow with evictor + trigger might be the best implementation > (basically the same as DataStream.countWindow(long, long). We need to add > timeouts to clean up state for non-used keys though. > > - OVER RANGE for processing time > - does not require sorted state (data always arrives in processing time > order) > - no need to consider retraction (processing time is never late) > - defines windows on row count > - I think this could also be implemented with a GlobalWindow with evictor > + trigger (need to verify) > > - OVER RANGE for event time > - need for sorted state (late data possible) > - IMO, a ProcessFunction gives us the most flexibility in adding later > features (retraction, update rate, etc.) > - @Shaoxuan, you sketched a good design. Would you like to continue with > a design proposal? > > # UNBOUNDED PRECEDING > Similar considerations apply for the UNBOUNDED PRECEDING cases of the above > window types. > > If we all agree that the separation into six JIRAs (bounded/unbounded * > row-pt/range-pt/ range-et) makes sense, I would suggest to move the > discussions about the design of the implementation to the individual JIRAs. > > What do think? > > Best, Fabian > > 2017-01-25 9:19 GMT+01:00 Shaoxuan Wang <wshaox...@gmail.com>: > > > Hi Liuxinchun, > > I am not sure where did you get the inception: anyone has suggested "to > > process Event time window in Sliding Row Window". If you were referring > my > > post, there may be some misunderstanding there. I think you were asking > the > > similar question as Hongyuhong. I have just replied to him. Please take a > > look and let me know if that makes sense to you. "Retraction" is an > > important building block to compute correct incremental results in > > streaming. It is another big topic, we should discuss this in another > > thread. > > > > Regards, > > Shaoxuan > > > > > > > > On Wed, Jan 25, 2017 at 3:44 PM, liuxinchun <liuxinc...@huawei.com> > wrote: > > > > > I don't think it is a good idea to process Event time window in Sliding > > > Row Window. In Sliding Time window, when an element is late, we can > > trigger > > > the recalculation of the related windows. And the sliding period is > > > coarse-gained, We only need to recalculate size/sliding number of > > windows. > > > But in Sliding Row Window, the calculation is triggered when every > > element > > > is coming. The sliding period is becoming fine-gained. When an element > is > > > late, there are so many "windows" are influenced. Even if we store all > > the > > > raw data, the computation is very large. > > > > > > I think if it is possible to set a standard to sliding Event Time Row > > > Window, When certain elements are late, we can only recalculate partial > > > windows and permit some error. For example, we can only recalculate the > > > windows end in range between (lateElement.timestamp - leftDelta, > > > lateElement.timestamp] and those windows begin in range between > > > [lateElement.timestamp, lateElement.timestamp + rightDelta). > > > //////////////////////////////////////////////////////////// > > > ////////////////////////// > > > Hi everyone, > > > Thanks for this great discussion, and glad to see more and more people > > are > > > interested on stream SQL & tableAPI. > > > > > > IMO, the key problems for Over window design are the SQL semantics and > > the > > > runtime design. I totally agree with Fabian that we should skip the > > design > > > of TumbleRows and SessionRows windows for now, as they are not well > > defined > > > in SQL semantics. > > > > > > Runtime design is the most crucial part we are interested in and > > > volunteered to contribute into. We have thousands of machines running > > flink > > > streaming jobs. The costs in terms of CPU, memory, and state are the > > vital > > > factors that we have to taken into account. We have been working on the > > > design of OVER window in the past months, and planning to send out a > > > detailed design doc to DEV quite soon. But since Fabian started a good > > > discussion on OVER window, I would like to share our ideas/thoughts > about > > > the runtime design for OVER window. > > > > > > 1. As SunJincheng pointed out earlier, sliding window does not work > > for > > > unbounded preceding, we need alternative approach for unbound over > > > window. > > > 2. Though sliding window may work for some cases of bounded window, > > > it is not very efficient thereby should not be used for production. > To > > > the > > > best of my understanding, the current runtime implementation of > > sliding > > > window has not leveraged the concepts of state Panes yet. This means > > > that > > > if we use sliding window for OVER window, there will be a backend > > state > > > created per each group (partition by) and each row, and whenever a > new > > > record arrives, it will be accumulated to all the existing windows > > that > > > has > > > not been closed. This would cause quite a lot of overhead in terms > of > > > both > > > CPU and memory&state. > > > 3. Fabian has mentioned an approach of leveraging “ProcessFunction” > > and > > > a “sortedState”. I like this idea. The design details on this are > not > > > quite > > > clear yet. So I would like to add more thoughts on this. Regardless > > > which dataStream API we are going to use (it is very likely that we > > need > > > a new API), we should come out with an optimal approach. The purpose > > of > > > grouping window and over window is to partition the data, such that > we > > > can > > > generate the aggregate results. So when we talk about the design of > > OVER > > > window, we have to think about the aggregates. As we proposed in our > > > recent > > > UDAGG doc https://goo.gl/6ntclB, the user defined accumulator will > > be > > > stored in the aggregate state. Besides accumulator, we have also > > > introduced > > > a retract API for UDAGG. With aggregate accumulator and retract > API, I > > > am > > > proposing a runtime approach to implement the OVER window as > > followings. > > > 4. > > > - We first implement a sorted state interface > > > - Per each group, we just create one sorted state. When a new > > record > > > arrives, it will insert into this sorted state, in the meanwhile > it > > > will be > > > accumulated to the aggregate accumulator. > > > - For over window, we keep the aggregate accumulator for the > entire > > > job lifelong time. This is different than the case where we > delete > > > the > > > accumulator for each group/window when a grouping-window is > > finished. > > > - When an over window is up to trigger, we grab the > > > previous accumulator from the state and accumulate values onto it > > > with all > > > the records till the upperBoundary of the current window, and > > > retract all > > > the out of scope records till its lowerBoundary. We emit the > > > aggregate result and save the accumulator for the next window. > > > > > > > > > Hello Fabian, > > > I would suggest we should first start working on runtime design of over > > > window and aggregate. Once we have a good design there, one can easily > > add > > > the support for SQL as well as tableAPI. What do you think? > > > > > > Regards, > > > Shaoxuan > > > > > > On Tue, Jan 24, 2017 at 10:42 PM, Fabian Hueske <fhue...@gmail.com> > > wrote: > > > > > > > Hi Radu, > > > > > > > > thanks for your comments! > > > > > > > > Yes, my intention is to open new JIRA issues to structure the > > > > development process. Everybody is very welcome to pick up issues and > > > > discuss the design proposals. > > > > At the moment I see the following six issues to start with: > > > > > > > > - streaming SQL OVER ROW for processing time > > > > - bounded PRECEDING > > > > - unbounded PRECEDING > > > > > > > > - streaming SQL OVER RANGE for processing time > > > > - bounded PRECEDING > > > > - unbounded PRECEDING > > > > > > > > - streaming SQL OVER RANGE for event time > > > > - bounded PRECEDING > > > > - unbounded PRECEDING > > > > > > > > For each of these windows we need corresponding translation rules and > > > > execution code. > > > > > > > > Subsequent JIRAs would be > > > > - extending the Table API for supported SQL windows > > > > - add support for FOLLOWING > > > > - etc. > > > > > > > > Regarding the requirement for a sorted state. I am not sure if the > > > > OVER windows should be implemented using Flink's DataStream window > > > framework. > > > > We need a good design document to figure out what is the best > > > > approach. A ProcessFunction with a sorted state might be a good > > solution > > > as well. > > > > > > > > Best, Fabian > > > > > > > > > > > > 2017-01-24 10:41 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>: > > > > > > > > > Hi all, > > > > > > > > > > Thanks for starting these discussion - it is very useful. > > > > > It does make sense indeed to refactor all these and coordinate a > bit > > > > > the efforts not to have overlapping implementations and > incompatible > > > > solutions. > > > > > > > > > > If you close the 3 jira issues you mentioned - do you plan to > > > > > redesign them and open new ones? Do you need help from our side - > we > > > > > can also pick the redesign of some of these new jira issues. For > > > > > example we already > > > > have > > > > > an implementation for this and we can help with the design. > > > > > Nevertheless, let's coordinate the effort. > > > > > > > > > > Regarding the support for the different types of window - I think > > > > > the > > > > best > > > > > option is to split the implementation in small units. We can easily > > > > > do > > > > this > > > > > from the transformation rule class and with this each particular > > > > > type of window (session/sliding/sliderows/processing time/...) > will > > > > > have a clear implementation and a corresponding architecture within > > > the jira issue? > > > > What > > > > > do you think about such a granularity? > > > > > > > > > > Regarding the issue of " Q4: The implementaion of SlideRows still > > > > > need a custom operator that collects records in a priority queue > > > > > ordered by the "rowtime", which is similar to the design we > > > > > discussed in FLINK-4697, right? " > > > > > Why would you need this operator? The window buffer can act to some > > > > extent > > > > > as a priority queue as long as the trigger and evictor is set to > > > > > work > > > > based > > > > > on the rowtime - or maybe I am missing something... Can you please > > > > clarify > > > > > this. > > > > > > > > > > > > > > > Dr. Radu Tudoran > > > > > Senior Research Engineer - Big Data Expert IT R&D Division > > > > > > > > > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > > > > > European Research Center > > > > > Riesstrasse 25, 80992 München > > > > > > > > > > E-mail: radu.tudo...@huawei.com > > > > > Mobile: +49 15209084330 > > > > > Telephone: +49 891588344173 > > > > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > > > > > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com > > > > > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB > 56063, > > > > > Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN > > > > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB > 56063, > > > > > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN > > > > > This e-mail and its attachments contain confidential information > from > > > > > HUAWEI, which is intended only for the person or entity whose > address > > > is > > > > > listed above. Any use of the information contained herein in any > way > > > > > (including, but not limited to, total or partial disclosure, > > > > reproduction, > > > > > or dissemination) by persons other than the intended recipient(s) > is > > > > > prohibited. If you receive this e-mail in error, please notify the > > > sender > > > > > by phone or email immediately and delete it! > > > > > > > > > > > > > > > -----Original Message----- > > > > > From: Jark Wu [mailto:wuchong...@alibaba-inc.com] > > > > > Sent: Tuesday, January 24, 2017 6:53 AM > > > > > To: dev@flink.apache.org > > > > > Subject: Re: [DISCUSS] Development of SQL OVER / Table API Row > > Windows > > > > for > > > > > streaming tables > > > > > > > > > > Hi Fabian, > > > > > > > > > > Thanks for bringing up this discussion and the nice approach to > avoid > > > > > overlapping contributions. > > > > > > > > > > All of these make sense to me. But I have some questions. > > > > > > > > > > Q1: If I understand correctly, we will not support TumbleRows and > > > > > SessionRows at the beginning. But maybe support them as a syntax > > sugar > > > > (in > > > > > Table API) when the SlideRows is supported in the future. Right ? > > > > > > > > > > Q2: How to support SessionRows based on SlideRows ? I don't get > how > > to > > > > > partition on "gap-separated". > > > > > > > > > > Q3: Should we break down the approach into smaller tasks for > > streaming > > > > > tables and batch tables ? > > > > > > > > > > Q4: The implementaion of SlideRows still need a custom operator > that > > > > > collects records in a priority queue ordered by the "rowtime", > which > > is > > > > > similar to the design we discussed in FLINK-4697, right? > > > > > > > > > > +1 not support for OVER ROW for event time at this point. > > > > > > > > > > Regards, Jark > > > > > > > > > > > > > > > > 在 2017年1月24日,上午10:28,Hongyuhong <hongyuh...@huawei.com> 写道: > > > > > > > > > > > > Hi, > > > > > > We are also interested in streaming sql and very willing to > > > participate > > > > > and contribute. > > > > > > > > > > > > We are now in progress and we will also contribute to calcite to > > push > > > > > forward the window and stream-join support. > > > > > > > > > > > > > > > > > > > > > > > > -------------- > > > > > > Sender: Fabian Hueske [mailto:fhue...@gmail.com] Send Time: > > > 2017年1月24日 > > > > > > 5:55 > > > > > > Receiver: dev@flink.apache.org > > > > > > Theme: Re: [DISCUSS] Development of SQL OVER / Table API Row > > Windows > > > > > > for streaming tables > > > > > > > > > > > > Hi Haohui, > > > > > > > > > > > > our plan was in fact to piggy-back on Calcite and use the TUMBLE > > > > > function [1] once is it is available (CALCITE-1345 [2]). > > > > > > Unfortunately, this issue does not seem to be very active, so I > > don't > > > > > know what the progress is. > > > > > > > > > > > > I would suggest to move the discussion about group windows to a > > > > separate > > > > > thread and keep this one focused on the organization of the SQL > OVER > > > > > windows. > > > > > > > > > > > > Best, > > > > > > Fabian > > > > > > > > > > > > [1] http://calcite.apache.org/docs/stream.html) > > > > > > [2] https://issues.apache.org/jira/browse/CALCITE-1345 > > > > > > > > > > > > 2017-01-23 22:42 GMT+01:00 Haohui Mai <ricet...@gmail.com>: > > > > > > > > > > > >> Hi Fabian, > > > > > >> > > > > > >> FLINK-4692 has added the support for tumbling window and we are > > > > > >> excited to try it out and expose it as a SQL construct. > > > > > >> > > > > > >> Just curious -- what's your thought on the SQL syntax on > tumbling > > > > > window? > > > > > >> > > > > > >> Implementation wise it might make sense to think tumbling window > > as > > > a > > > > > >> special case of the sliding window. > > > > > >> > > > > > >> The problem I see is that the OVER construct might be > insufficient > > > to > > > > > >> support all the use cases of tumbling windows. For example, it > > fails > > > > > >> to express tumbling windows that have fractional time units (as > > > > > >> pointed out in http://calcite.apache.org/docs/stream.html). > > > > > >> > > > > > >> It looks to me that the Calcite / Azure Stream Analytics have > > > > > >> introduced a new construct (TUMBLE / TUMBLINGWINDOW) to address > > this > > > > > issue. > > > > > >> > > > > > >> Do you think it is a good idea to follow the same conventions? > > Your > > > > > >> ideas are appreciated. > > > > > >> > > > > > >> Regards, > > > > > >> Haohui > > > > > >> > > > > > >> > > > > > >> On Mon, Jan 23, 2017 at 1:02 PM Haohui Mai <ricet...@gmail.com> > > > > wrote: > > > > > >> > > > > > >>> +1 > > > > > >>> > > > > > >>> We are also quite interested in these features and would love > to > > > > > >>> participate and contribute. > > > > > >>> > > > > > >>> ~Haohui > > > > > >>> > > > > > >>> On Mon, Jan 23, 2017 at 7:31 AM Fabian Hueske < > fhue...@gmail.com > > > > > > > > wrote: > > > > > >>> > > > > > >>>> Hi everybody, > > > > > >>>> > > > > > >>>> it seems that currently several contributors are working on > new > > > > > >>>> features for the streaming Table API / SQL around row windows > > (as > > > > > >>>> defined in > > > > > >>>> FLIP-11 > > > > > >>>> [1]) and SQL OVER-style window (FLINK-4678, FLINK-4679, > > > FLINK-4680, > > > > > >>>> FLINK-5584). > > > > > >>>> Since these efforts overlap quite a bit I spent some time > > thinking > > > > > >>>> about how we can approach these features and how to avoid > > > > > >>>> overlapping contributions. > > > > > >>>> > > > > > >>>> The challenge here is the following. Some of the Table API row > > > > > >>>> windows > > > > > >> as > > > > > >>>> defined by FLIP-11 [1] are basically SQL OVER windows while > > other > > > > > >>>> cannot be easily expressed as such (TumbleRows for row-count > > > > > >>>> intervals, SessionRows). > > > > > >>>> However, since Calcite already supports SQL OVER windows, we > can > > > > > >>>> reuse > > > > > >> the > > > > > >>>> optimization logic for some of the Table API row windows. I > also > > > > > >>>> thought about the semantics of the TumbleRows and SessionRows > > > > > >>>> windows as defined in > > > > > >>>> FLIP-11 and came to the conclusion that these are not well > > defined > > > > > >>>> in > > > > > >>>> FLIP-11 and should rather be defined as SlideRows windows > with a > > > > > >>>> special PARTITION BY clause. > > > > > >>>> > > > > > >>>> I propose to approach SQL OVER windows and Table API row > windows > > > as > > > > > >>>> follows: > > > > > >>>> > > > > > >>>> We start with three simple cases for SQL OVER windows (not > Table > > > > > >>>> API > > > > > >> yet): > > > > > >>>> > > > > > >>>> * OVER RANGE for event time > > > > > >>>> * OVER RANGE for processing time > > > > > >>>> * OVER ROW for processing time > > > > > >>>> > > > > > >>>> All cases fulfill the following restrictions: > > > > > >>>> - All aggregations in SELECT must refer to the same window. > > > > > >>>> - PARTITION BY may not contain the rowtime attribute. > > > > > >>>> - ORDER BY must be on rowtime attribute (for event time) or > on a > > > > > >>>> marker function that indicates processing time. Additional > sort > > > > > >>>> attributes are not supported initially. > > > > > >>>> - only "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" and > > "BETWEEN > > > x > > > > > >>>> PRECEDING AND CURRENT ROW" are supported. > > > > > >>>> > > > > > >>>> OVER ROW for event time cannot be easily supported. With event > > > > > >>>> time, we may have late records which need to be injected into > > the > > > > > >>>> order of records. > > > > > >>>> When > > > > > >>>> a record in injected in to the order where a row-count window > > has > > > > > >> already > > > > > >>>> been computed, this and all following windows will change. We > > > could > > > > > >> either > > > > > >>>> drop the record or sent out many retraction records. I think > it > > is > > > > > >>>> best > > > > > >> to > > > > > >>>> not open this can of worms at this point. > > > > > >>>> > > > > > >>>> The rational for all of the above restrictions is to have > first > > > > > >>>> versions of OVER windows soon. > > > > > >>>> Once we have the above cases covered we can extend and remove > > > > > >> limitations > > > > > >>>> as follows: > > > > > >>>> > > > > > >>>> - Table API SlideRow windows (with the same restrictions as > > > above). > > > > > >>>> This will be mostly API work since the execution part has been > > > > solved > > > > > before. > > > > > >>>> - Add support for FOLLOWING (except UNBOUNDED FOLLOWING) > > > > > >>>> - Add support for different windows in SELECT. All windows > must > > be > > > > > >>>> partitioned and ordered in the same way. > > > > > >>>> - Add support for additional ORDER BY attributes (besides > time). > > > > > >>>> > > > > > >>>> As I said before, TumbleRows and SessionRows windows as in > > FLIP-11 > > > > > >>>> are > > > > > >> not > > > > > >>>> well defined, IMO. > > > > > >>>> They can be expressed as SlideRows windows with special > > > > > >>>> partitioning (partitioning on fixed, non-overlapping time > ranges > > > > > >>>> for TumbleRows, and gap-separated, non-overlapping time ranges > > for > > > > > >>>> SessionRows) I would not start to work on those yet. > > > > > >>>> > > > > > >>>> I would like to close all related JIRA issues (FLINK-4678, > > > > > >>>> FLINK-4679, FLINK-4680, FLINK-5584) and restructure the > > > development > > > > > >>>> of these > > > > > >> features > > > > > >>>> as outlined above with corresponding JIRA issues. > > > > > >>>> > > > > > >>>> What do others think? (I cc'ed the contributors assigned to > the > > > > > >>>> above > > > > > >> JIRA > > > > > >>>> issues) > > > > > >>>> > > > > > >>>> Best, Fabian > > > > > >>>> > > > > > >>>> [1] > > > > > >>>> > > > > > >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP- > > > > > >> 11%3A+Table+API+Stream+Aggregations > > > > > >>>> > > > > > >>> > > > > > >> > > > > > > > > > > > > > > > > > > > >