Hi , I agree that Table API should be a SQL-like API, that do not need to be strictly consistent with SQL.
> - Not put the window definition into the groupBy clause. +1 Putting the window clause out of groupBy, will be easy to learn and easy to discover for users. > I like the idea of having separate ".window()" and ".rowWindow()" > clauses. +1 > I would prefer to not have a "partitionBy" statement. As far as I know, the partitionBy is playing the same role with groupBy in Table API. So maybe groupBy is enough, there is no need to introduce a new clause. - Jark Wu > 在 2016年10月27日,上午2:22,Stephan Ewen <se...@apache.org> 写道: > > Hi all! > > I think that in order to get a better hold on how we what to build the > Table API, we need to *decide what the role of the Table API should be*. We > touched on that a few times, but I think we still have different ideas > about that. > > To get there, let me take back a step and look at the design of Stream SQL > again. There were basically two competing approaches: > > (1) Keep SQL as it is and make it run on infinite streams via introducing > dynamic tables > (2) Do a new language that is similar to SQL, but designed with streaming > concepts in mind (first class support for time and windows) > > Both approaches had good points. The Stream SQL design doc posted followed > approach (1) - keep SQL as it is. > > > > Now, for the Table API, we seem to be having a similar discussion again. > > (1) Let the Table API be as similar to SQL as possible, simply make it feel > "fluently embedded" in Scala. > (2) Define the Table API as one would define a new and clean DSL for > streaming. SQL inspired, of course. Where SQL syntax feels natural, use the > SQL syntax, but make it very accessible to Java/Scala (non-SQL) programmers. > > > I am personally more in favor of variant (2) for the following reasons: > > - We already have SQL compliant with the standards and tools. > > - Mirroring SQL too closely into the Table API has the marginal benefit > that someone close to SQL will find it a bit more familiar. Not sure if > that is even the case, as they have to re-learn the fluent DSL and Scala > concepts > > - We are making it more difficult for all those that come from a more > Scala/Java DataStream background and simply want to move "a layer up", > getting schema and more optimizations into the equation. > > > > > What would that mean for the specific issues that are discussed in FLIP-11? > Based on interpreting the Table API as re-imaged streaming DSL, I would > suggest to > > - Not put the window definition into the groupBy clause. It just is > unexpected for all that are not super familiar with SQL and hard to > discover in the IDE. A separate window clause is simpler for users coming > from the DataStream background (or other streaming APIs) and it is more > discoverable in the IDE. > > - I like the idea of having separate ".window()" and ".rowWindow()" > clauses. Makes it more explicit that very different things will happen. > > - I would prefer to not have a "partitionBy" statement. When we restrain > the Table API at least initially to having one partitioning for the > windows, we should be able to express the partitioning by simply adding it > to the fields in the "groupBy" clause. That would make the API easier > accessible to users that not SQL powerusers. > > > What do others think? > > Greetings, > Stephan > > > On Sat, Oct 15, 2016 at 1:02 AM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Thanks for your reply Shaoxuan! >> >> Please see my replies below. >> >> Best, Fabian >> >> 2016-10-14 11:34 GMT+02:00 Sean Wang <wshaox...@gmail.com>: >> >>> Thanks for your quick reply, Fabian. >>> >>> I have a few minor comments&suggestions: >>> >>> <GroupBy with window> >>> - Agree that we should consider GroupBy without window after the new SQL >>> proposal is settled down. >>> >>> >> OK, so we keep this as it is for now. GroupBy without windows will be >> supported later when we are able to "guard" the memory requirements of that >> operation. >> >> >>> <GroupBy with window> >>> - For Java API, we can keep window() call, and put window alias into >>> Groupby clause. This can be also applied to rowwindow case. >>> >>> >> Referring to the window alias in the groupBy clause would require to invert >> the methods, i.e., groupBy().window() -> window().groupBy(). I am not sure >> if that is more intuitive. Also, Scala and Java are using the same class >> (Table) but different methods (Java uses String parameter, Scala Expression >> parameters). In my opinion it makes sense to have both APIs closely synced. >> So I would either keep window() (after groupBy) for Scala and Java or >> remove it for both. >> >> >>> <RowWindows> & <partitionby() for rowwindow> >>> -+1 to support replace groupby() by partitionby(). BTW, in the case of >>> over, instead of partitionby, are we going to support orderby? If yes, I >>> would suggest to define rowwindow as rowwindow(PartionByParaType, >> OrderBy >>> ParaType, WindowParaType). >>> >>> >> The current FLIP-11 proposal supports defining both partitionBy and orderBy >> (with a few restrictions). >> PartitionBy is defined for all windows alike by calling >> >> table.partitionBy(...).rowWindow(Window1 as w1, Window2 as >> w2).select(count() over w1). >> >> Allowing windows with different partitioning would mean that data is >> shuffled to different nodes and that we need a distributed join to assemble >> the result rows. In principle this could be done but would be very >> expensive to execute (applies to batch and streaming). In my opinion, we >> should not support this case. >> >> OrderBy is implicitly supported by the on() clause of RowWindows, e.g., >> >> rowWindow(TumbleRows over 10.minutes on ‘rowtime as ‘w) >> >> says that the window is ordered on the rowtime, i.e., event-time, >> attribute. For streaming we can only allow event-time order (or >> processing-time order which is always given). Orders on other attributes >> would not be possible (for infinite dynamic input tables) or very expensive >> (memory and computation wise) to maintain (for finite dynamic input >> tables). For queries on batch tables, in principle all orders are possible. >> With the current proposal only count windows are supported for arbitrary >> attribute types and time windows for timestamp attributes. >> >> So >>> - moving windows into the groupBy() call : +1 >>> >> >> +1 >> >> >>> - making over() for rowWindow() with a single window definition. >>> >> >> +1 >> >> >>> - additionally allowing window definitions in over(): +1 yes for scala, >>> but use alias for java API. >>> >> >> If we have the parser code for Java group windows in groupBy() it should be >> easy to adapt this for over(). But we should also keep the rowWindow method >> to define aliases. >> >> >>> - using partitionBy() instead of groupBy() for row windows?: +1, but >>> better to consider orderby, it may be even better to move partitionBy() >>> into rowwindow. >>> >>> >> +1 to change groupBy() to partitionBy(). >> I would not move partitionBy() into the RowWindow definition but keep it >> outside to ensure only one partitioning is defined. The orderBy definition >> is already fluently included in the RowWindow via the on() method. >> >> >> >>> Regards, >>> Shaoxuan >>> >>> >>> On Thu, Oct 13, 2016 at 6:05 PM, Fabian Hueske <fhue...@gmail.com> >> wrote: >>> >>>> Hi everybody, >>>> >>>> happy to see a good discussion here :-) >>>> I'll reply to Shaoxuan's mail first and comment on Zhangrucong question >>>> in a separate mail. >>>> >>>> Shaoxuan, thanks for the suggestions! I think we all agree that for SQL >>>> we should definitely follow the standard (batch) SQL syntax. >>>> In my opinion, the Table API does not necessarily have to be as close as >>>> possible to SQL but should try to make a few things easier and also >> safer >>>> (easier is of course subjective). >>>> >>>> - GroupBy without windows: These are currently intentionally not >>>> supported and also not part of FLIP-11. Our motivation for not >> supporting >>>> this, is to guard the user from defining a query that fails when being >>>> executed due to a very memory consuming operation. FLIP-11 provides a >> way >>>> to define such a query as a sliding row window with unbounded preceding >>>> rows. With the upcoming SQL proposal, queries that consume unbounded >> memory >>>> should be identified and rejected. I would be in favor of allowing >> groupBy >>>> without windows once the guarding mechanism are in place. >>>> >>>> - GroupBy with window: I think this is a question of taste. Having a >>>> window() call, makes the feature more explicit in my opinion. However, >> I'm >>>> not opposed to move the windows into the groupBy clause. >>>> Implementation-wise it should be easy to move the window definition >> into to >>>> groupBy clause for the Scala Table API. For the Java Table API we would >>>> need to extend the parser quite a bit because windows would need to be >>>> defined as Strings and not via objects. >>>> >>>> - RowWindows: The rowWindow() call mimics the standard SQL WINDOW clause >>>> (implemented by PostgreSQL and Calcite) which allows to have "reusable" >>>> window definitions. I think this is a desirable feature. In the FLIP-11 >>>> proposal the over() clause in select() refers to the predefined windows >>>> with aliases. In case only one window is defined, the over() clause is >>>> optional and the same (and only) window is applied to all aggregates. I >>>> think we can make the over() call mandatory to have the windowing more >>>> explicit. It should also be possible to extend the over clause to >> directly >>>> accept RowWindows instead of window aliases. I would not make this a >>>> priority at the moment, but a feature that could be later added, because >>>> rowWindow() and over() cover all cases. Similar as for GroupBy with >>>> windows, we would need to extend the parser for the Java Table API >> though. >>>> >>>> Finally, I have an own suggestion: >>>> In FLIP-11, groupBy() is used to define the partitioning of RowWindows. >>>> I think this should be changed to partitionBy() because groupBy() groups >>>> data and applies an aggregation to all rows of a group which is not >>>> happening here. In original SQL, the OVER clause features a PARTITION BY >>>> clause. We are moving this out of the window definition, i.e., OVER >> clause, >>>> to enforce the same partitioning for all windows (different >> partitionings >>>> would be a challenge to execute in a parallel system). >>>> >>>> @Timo and all: What do you think about: >>>> >>>> - moving windows into the groupBy() call >>>> - making over() for rowWindow() with a single window definition >>>> - additionally allowing window definitions in over() >>>> - using partitionBy() instead of groupBy() for row windows? >>>> >>>> Best, Fabian >>>> >>>> 2016-10-13 11:10 GMT+02:00 Zhangrucong <zhangruc...@huawei.com>: >>>> >>>>> Hi shaoxuan: >>>>> >>>>> I think, the streamsql must be excuted in table environment. So I call >>>>> this table API ‘s StreamSQL. What do you call for this, stream Table >> API or >>>>> streamsql? It is fu >>>>> >>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>>>> val tblEnv = TableEnvironment.getTableEnvironment(env) >>>>> val ds: DataStream[(String,Long, Long)] = >> env.readTextFile("/home/demo") >>>>> tblEnv.registerDataStream("Order", ds, 'userID, 'count, 'num) >>>>> .map(f=>(f, 1L, 1L)) >>>>> val sql = tblEnv.sql("SELECT Stream * FROM Order WHERE userID='A'") >>>>> >>>>> So in my opinion, the grammar which is marked red should be compatible >>>>> with calcite's StreamSQL grammar. >>>>> >>>>> By the way, thanks very much for telling me the modified content in >>>>> Flink StreamSQL. I will look the new proposal . >>>>> >>>>> Thanks! >>>>> 发件人: Sean Wang [mailto:wshaox...@gmail.com] >>>>> 发送时间: 2016年10月13日 16:29 >>>>> 收件人: dev@flink.apache.org; Zhangrucong >>>>> 主题: Re: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations >>>>> >>>>> Hi zhangrucong, >>>>> I am not sure what you mean by "table API'S StreamSQL", I guess you >> mean >>>>> "stream TableAPI"? >>>>> TableAPI should be compatible with calcite SQL. (By compatible, My >>>>> understanding is that both TableAPI and SQL will be translated to the >> same >>>>> logical plan - the same set of REL and REX). >>>>> BTW, please note that we recently have merged a change to remove STREAM >>>>> keyword for flink stream SQL(FLINK-4546). In our opinion, batch and >> stream >>>>> are not necessarily to be differentiated at the SQL level. The major >>>>> difference between batch and stream is "WHEN and HOW to emit the >> result". >>>>> We have been working on a new proposal with Fabian on this change. I >>>>> guess it will be sent out for review very soon. >>>>> >>>>> Regards, >>>>> Shaoxuan >>>>> >>>>> >>>>> On Thu, Oct 13, 2016 at 2:29 PM, Zhangrucong <zhangruc...@huawei.com >>>>> <mailto:zhangruc...@huawei.com>> wrote: >>>>> Hi shaoxuan: >>>>> Does the table API'S StreamSQL grammar is compatible with calcite's >>>>> StreamSQL grammar? >>>>> >>>>> >>>>> 1、In calcite, the tumble window is realized by using function tumble or >>>>> hop. And the function must be used with group by, like this: >>>>> >>>>> SELECT >>>>> TUMBLE_END(rowtime, INTERVAL '30' MINUTE, TIME '0:12') AS rowtime, >>>>> productId, >>>>> COUNT(*) AS c, >>>>> SUM(units) AS units >>>>> FROM Orders >>>>> GROUP BY TUMBLE(rowtime, INTERVAL '30' MINUTE, TIME '0:12'), >>>>> productId; >>>>> >>>>> 2、 The sliding window uses keywords "window" and "over". Like this: >>>>> >>>>> SELECT * >>>>> FROM ( >>>>> SELECT STREAM rowtime, >>>>> productId, >>>>> units, >>>>> AVG(units) OVER product (RANGE INTERVAL '10' MINUTE PRECEDING) AS >>>>> m10, >>>>> AVG(units) OVER product (RANGE INTERVAL '7' DAY PRECEDING) AS d7 >>>>> FROM Orders >>>>> WINDOW product AS ( >>>>> ORDER BY rowtime >>>>> PARTITION BY productId)) >>>>> >>>>> >>>>> >>>>> Thanks! >>>>> >>>>> -----邮件原件----- >>>>> 发件人: 王绍翾(大沙) [mailto:shaoxuan....@alibaba-inc.com<mailto: >>>>> shaoxuan....@alibaba-inc.com>] >>>>> 发送时间: 2016年10月13日 2:03 >>>>> 收件人: dev@flink.apache.org<mailto:dev@flink.apache.org> >>>>> 主题: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations >>>>> >>>>> Hi Fabian, Timo, and Jark.Thanks for kicking off this FLIP. This is a >>>>> really great and promising proposal. I have a few comments to the >> "window" >>>>> operator proposed in this FLIP (I am hoping it is not too late to >> bring up >>>>> this). First, window is not always needed for the stream aggregation. >> There >>>>> are cases where we want do an aggreation on a stream, while the >> query/emit >>>>> strategy decides when to emit a streaming output. Second, window is >> needed >>>>> when we want do an aggregation for a certain rage, but window is not an >>>>> operator. We basically use window to define the range for aggregation. >> In >>>>> tableAPI, a window should be defined together with "groupby" and >> "select" >>>>> operators, either inside a "groupby" operator or after an "over" >> clause in >>>>> "select" operator. This will make the TableAPI in the similar manner >> as SQL. >>>>> For instance,[A groupby without window] >>>>> <Table API> >>>>> val res = tab >>>>> .groupBy(‘a) >>>>> .select(‘a, ‘b.sum) >>>>> <SQL> >>>>> SELECT a, SUM(b) >>>>> FROM tab >>>>> GROUP BY a >>>>> [A tumble window inside groupby] >>>>> <Table API>val res = tab >>>>> .groupBy(‘a, tumble(10.minutes, ‘rowtime)) .select(‘a, ‘b.sum) >>>>> <SQL>SELECT a, SUM(b)FROM tab GROUP BY a, TUMBLE(10.minutes , >> ‘rowtime) [A >>>>> row tumble window after OVER] <Table API>.groupby('a) //optional >>>>> .select(‘a, ‘b.count over rowTumble(10.minutes, ‘rowtime))<SQL>SELECT >> a, >>>>> COUNT(b) OVER ROWTUMBLE(10.minutes, ‘rowtime)FROM tab GROUP BY a >> Please let >>>>> me know what you think. >>>>> Regards,Shaoxuan >>>>> ------------------------------------------------------------ >> ------发件人:Fabian >>>>> Hueske <fhue...@gmail.com<mailto:fhue...@gmail.com>>发送时间:2016年9月26日 >> (星期一) >>>>> 21:13收件人:dev@flink.apache.org<mailto:dev@flink.apache.org> < >>>>> dev@flink.apache.org<mailto:dev@flink.apache.org>>主 题:Re: [DISCUSS] >>>>> FLIP-11: Table API Stream Aggregations Hi everybody, >>>>> >>>>> Timo proposed our FLIP-11 a bit more than three weeks ago. >>>>> I will update the status of the FLIP to accepted. >>>>> >>>>> Thanks, >>>>> Fabian >>>>> >>>>> 2016-09-19 9:16 GMT+02:00 Timo Walther <twal...@apache.org<mailto:twa >>>>> l...@apache.org>>: >>>>> >>>>>> Hi Jark, >>>>>> >>>>>> yes I think enough time has passed. We can start implementing the >>>>> changes. >>>>>> What do you think Fabian? >>>>>> >>>>>> If there are no objections, I will create the subtasks in Jira today. >>>>>> For >>>>>> FLIP-11/1 I already have implemented a prototype, I just have to do >>>>>> some >>>>>> refactoring/documentation before opening a PR. >>>>>> >>>>>> Timo >>>>>> >>>>>> >>>>>> Am 18/09/16 um 04:46 schrieb Jark Wu: >>>>>> >>>>>> Hi all, >>>>>>> >>>>>>> It seems that there’s no objections to the window design. So could >> we >>>>>>> open subtasks to start working on it now ? >>>>>>> >>>>>>> - Jark Wu >>>>>>> >>>>>>> 在 2016年9月7日,下午4:29,Jark Wu <wuchong...@alibaba-inc.com<mailto: >>>>> wuchong...@alibaba-inc.com>> 写道: >>>>>>>> >>>>>>>> Hi Fabian, >>>>>>>> >>>>>>>> Thanks for sharing your ideas. >>>>>>>> >>>>>>>> They all make sense to me. Regarding to reassigning timestamp, I do >>>>>>>> not >>>>>>>> have an use case. I come up with this because DataStream has a >>>>>>>> TimestampAssigner :) >>>>>>>> >>>>>>>> +1 for this FLIP. >>>>>>>> >>>>>>>> - Jark Wu >>>>>>>> >>>>>>>> 在 2016年9月7日,下午2:59,Fabian Hueske <fhue...@gmail.com<mailto:fhue >>>>> s...@gmail.com> <mailto: >>>>> >>>>>>>>> fhue...@gmail.com<mailto:fhue...@gmail.com>>> 写道: >>>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> thanks for your comments and questions! >>>>>>>>> Actually, you are bringing up the points that Timo and I discussed >>>>>>>>> the >>>>>>>>> most >>>>>>>>> when designing the FLIP ;-) >>>>>>>>> >>>>>>>>> - We also thought about the syntactic shortcut for running >>>>>>>>> aggregates >>>>>>>>> like >>>>>>>>> you proposed (table.groupBy(‘a).select(…)). Our motivation to not >>>>>>>>> allow >>>>>>>>> this shortcut is to prevent users from accidentally performing a >>>>>>>>> "dangerous" operation. The problem with unbounded sliding >>>>>>>>> row-windows is >>>>>>>>> that their state does never expire. If you have an evolving key >>>>>>>>> space, >>>>>>>>> you >>>>>>>>> will likely run into problems at some point because the operator >>>>>>>>> state >>>>>>>>> grows too large. IMO, a row-window session is a better approach, >>>>>>>>> because it >>>>>>>>> defines a timeout after which state can be discarded. >>>>>>>>> groupBy.select is >>>>>>>>> a >>>>>>>>> very common operation in batch but its semantics in streaming are >>>>>>>>> very >>>>>>>>> different. In my opinion it makes sense to make users aware of >>>>>>>>> these >>>>>>>>> differences through the API. >>>>>>>>> >>>>>>>>> - Reassigning timestamps and watermarks is a very delicate issue. >>>>>>>>> You >>>>>>>>> are >>>>>>>>> right, that Calcite exposes this field which is necessary due to >>>>>>>>> the >>>>>>>>> semantics of SQL. However, also in Calcite you cannot freely >> choose >>>>>>>>> the >>>>>>>>> timestamp attribute for streaming queries (it must be a monotone >> or >>>>>>>>> quasi-monotone attribute) which is hard to reason about (and >>>>>>>>> guarantee) >>>>>>>>> after a few operators have been applied. Streaming tables in Flink >>>>>>>>> will >>>>>>>>> likely have a time attribute which is identical to the initial >>>>> rowtime. >>>>>>>>> However, Flink does modify timestamps internally, e.g., for >> records >>>>>>>>> that >>>>>>>>> are emitted from time windows, in order to ensure that consecutive >>>>>>>>> windows >>>>>>>>> perform as expected. Modify or reassign timestamps in the middle >> of >>>>>>>>> a >>>>>>>>> job >>>>>>>>> can result in unexpected results which are very hard to reason >>>>>>>>> about. Do >>>>>>>>> you have a concrete use case in mind for reassigning timestamps? >>>>>>>>> >>>>>>>>> - The idea to represent rowtime and systime as object is good. Our >>>>>>>>> motivation to go for reserved Scala symbols was to have a uniform >>>>>>>>> syntax >>>>>>>>> with windows over streaming and batch tables. On batch tables you >>>>>>>>> can >>>>>>>>> compute time windows basically over every time attribute (they are >>>>>>>>> treated >>>>>>>>> similar to grouping attributes with a bit of extra logic to >> extract >>>>>>>>> the >>>>>>>>> grouping key for sliding and session windows). If you write >>>>>>>>> window(Tumble >>>>>>>>> over 10.minutes on 'rowtime) on a streaming table, 'rowtime would >>>>>>>>> indicate >>>>>>>>> event-time. On a batch table with a 'rowtime attribute, the same >>>>>>>>> operator >>>>>>>>> would be internally converted into a group by. By going for the >>>>>>>>> object >>>>>>>>> approach we would lose this compatibility (or would need to >>>>>>>>> introduce an >>>>>>>>> additional column attribute to specifiy the window attribute for >>>>>>>>> batch >>>>>>>>> tables). >>>>>>>>> >>>>>>>>> As usual some of the design decisions are based on preferences. >>>>>>>>> Do they make sense to you? Let me know what you think. >>>>>>>>> >>>>>>>>> Best, Fabian >>>>>>>>> >>>>>>>>> >>>>>>>>> 2016-09-07 5:12 GMT+02:00 Jark Wu <wuchong...@alibaba-inc.com<ma >>>>> ilto:wuchong...@alibaba-inc.com> <mailto: >>>>> >>>>>>>>> wuchong...@alibaba-inc.com<mailto:wuchong...@alibaba-inc.com>>>: >>>>>>>>> >>>>>>>>> Hi all, >>>>>>>>>> >>>>>>>>>> I'm on vacation for about five days , sorry to have missed this >>>>>>>>>> great >>>>>>>>>> FLIP. >>>>>>>>>> >>>>>>>>>> Yes, the non-windowed aggregates is a special case of row-window. >>>>>>>>>> And >>>>>>>>>> the >>>>>>>>>> proposal looks really good. Can we have a simplified form for >> the >>>>>>>>>> special >>>>>>>>>> case? Such as : table.groupBy(‘a).rowWindow(Sl >>>>>>>>>> ideRows.unboundedPreceding).select(…) >>>>>>>>>> can be simplified to table.groupBy(‘a).select(…). The latter >> will >>>>>>>>>> actually >>>>>>>>>> call the former. >>>>>>>>>> >>>>>>>>>> Another question is about the rowtime. As the FLIP said, >>>>>>>>>> DataStream and >>>>>>>>>> StreamTableSource is responsible to assign timestamps and >>>>>>>>>> watermarks, >>>>>>>>>> furthermore “rowtime” and “systemtime” are not real column. IMO, >>>>>>>>>> it is >>>>>>>>>> different with Calcite’s rowtime, which is a real column in the >>>>> table. >>>>>>>>>> In >>>>>>>>>> FLIP's way, we will lose some flexibility. Because the timestamp >>>>>>>>>> column may >>>>>>>>>> be created after some transformations or join operation, not >>>>>>>>>> created at >>>>>>>>>> beginning. So why do we have to define rowtime at beginning? >>>>>>>>>> (because >>>>>>>>>> of >>>>>>>>>> watermark?) Can we have a way to define rowtime after source >>>>>>>>>> table >>>>>>>>>> like >>>>>>>>>> TimestampAssinger? >>>>>>>>>> >>>>>>>>>> Regarding to “allowLateness” method. I come up a trick that we >> can >>>>>>>>>> make >>>>>>>>>> ‘rowtime and ‘system to be a Scala object, not a symbol >> expression. >>>>>>>>>> The API >>>>>>>>>> will looks like this : >>>>>>>>>> >>>>>>>>>> window(Tumble over 10.minutes on rowtime allowLateness as ‘w) >>>>>>>>>> >>>>>>>>>> The implementation will look like this: >>>>>>>>>> >>>>>>>>>> class TumblingWindow(size: Expression) extends Window { >>>>>>>>>> def on(time: rowtime.type): TumblingEventTimeWindow = >>>>>>>>>> new TumblingEventTimeWindow(alias, ‘rowtime, size) >> // >>>>>>>>>> has >>>>>>>>>> allowLateness() method >>>>>>>>>> >>>>>>>>>> def on(time: systemtime.type): TumblingProcessingTimeWindow= >>>>>>>>>> new TumblingProcessingTimeWindow(alias, ‘systemtime, size) >>>>>>>>>> // hasn’t allowLateness() method >>>>>>>>>> } >>>>>>>>>> object rowtime >>>>>>>>>> object systemtime >>>>>>>>>> >>>>>>>>>> What do you think about this? >>>>>>>>>> >>>>>>>>>> - Jark Wu >>>>>>>>>> >>>>>>>>>> 在 2016年9月6日,下午11:00,Timo Walther <twal...@apache.org<mailto:twa >>>>> l...@apache.org> <mailto: >>>>> >>>>>>>>>>> twal...@apache.org<mailto:twal...@apache.org>>> 写道: >>>>>>>>>>> >>>>>>>>>>> Hi all, >>>>>>>>>>> >>>>>>>>>>> I thought about the API of the FLIP again. If we allow the >>>>>>>>>>> "systemtime" >>>>>>>>>>> >>>>>>>>>> attribute, we cannot implement a nice method chaining where the >>>>>>>>>> user >>>>>>>>>> can >>>>>>>>>> define a "allowLateness" only on event time. So even if the user >>>>>>>>>> expressed >>>>>>>>>> that "systemtime" is used we have to offer a "allowLateness" >>>>>>>>>> method >>>>>>>>>> because >>>>>>>>>> we have to assume that this attribute can also be the batch event >>>>>>>>>> time >>>>>>>>>> column, which is not very nice. >>>>>>>>>> >>>>>>>>>>> class TumblingWindow(size: Expression) extends Window { >>>>>>>>>>> def on(timeField: Expression): TumblingEventTimeWindow = >>>>>>>>>>> new TumblingEventTimeWindow(alias, timeField, size) // has >>>>>>>>>>> >>>>>>>>>> allowLateness() method >>>>>>>>>> >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> What do you think? >>>>>>>>>>> >>>>>>>>>>> Timo >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Am 05/09/16 um 10:41 schrieb Fabian Hueske: >>>>>>>>>>> >>>>>>>>>>>> Hi Jark, >>>>>>>>>>>> >>>>>>>>>>>> you had asked for non-windowed aggregates in the Table API a >> few >>>>>>>>>>>> times. >>>>>>>>>>>> FLIP-11 proposes row-window aggregates which are a >>>>>>>>>>>> generalization of >>>>>>>>>>>> running aggregates (SlideRow unboundedPreceding). >>>>>>>>>>>> >>>>>>>>>>>> Can you have a look at the FLIP and give feedback whether this >>>>>>>>>>>> is >>>>>>>>>>>> what >>>>>>>>>>>> >>>>>>>>>>> you >>>>>>>>>> >>>>>>>>>>> are looking for? >>>>>>>>>>>> Improvement suggestions are very welcome as well. >>>>>>>>>>>> >>>>>>>>>>>> Thank you, >>>>>>>>>>>> Fabian >>>>>>>>>>>> >>>>>>>>>>>> 2016-09-01 16<tel:2016-09-01%C2%A016>:12 GMT+02:00 Timo Walther >>>>> <twal...@apache.org<mailto:twal...@apache.org> <mailto: >>>>>>>>>>>> twal...@apache.org<mailto:twal...@apache.org>>>: >>>>>>>>>>>> >>>>>>>>>>>> Hi all! >>>>>>>>>>>>> >>>>>>>>>>>>> Fabian and I worked on a FLIP for Stream Aggregations in the >>>>>>>>>>>>> Table >>>>>>>>>>>>> API. >>>>>>>>>>>>> You can find the FLIP-11 here: >>>>>>>>>>>>> >>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%<h >>>>> ttps://cwiki.apache.org/confluence/display/FLINK/FLIP-11%25> < >>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%<h >>>>> ttps://cwiki.apache.org/confluence/display/FLINK/FLIP-11%25>> >>>>> >>>>>>>>>>>>> 3A+Table+API+Stream+Aggregations >>>>>>>>>>>>> >>>>>>>>>>>>> Motivation for the FLIP: >>>>>>>>>>>>> >>>>>>>>>>>>> The Table API is a declarative API to define queries on static >>>>>>>>>>>>> and >>>>>>>>>>>>> streaming tables. So far, only projection, selection, and >> union >>>>>>>>>>>>> are >>>>>>>>>>>>> supported operations on streaming tables. >>>>>>>>>>>>> >>>>>>>>>>>>> This FLIP proposes to add support for different types of >>>>>>>>>>>>> aggregations >>>>>>>>>>>>> >>>>>>>>>>>> on >>>>>>>>>> >>>>>>>>>>> top of streaming tables. In particular, we seek to support: >>>>>>>>>>>>> >>>>>>>>>>>>> - Group-window aggregates, i.e., aggregates which are computed >>>>>>>>>>>>> for a >>>>>>>>>>>>> >>>>>>>>>>>> group >>>>>>>>>> >>>>>>>>>>> of elements. A (time or row-count) window is required to bound >>>>>>>>>>> the >>>>>>>>>>>>> >>>>>>>>>>>> infinite >>>>>>>>>> >>>>>>>>>>> input stream into a finite group. >>>>>>>>>>>>> >>>>>>>>>>>>> - Row-window aggregates, i.e., aggregates which are computed >>>>>>>>>>>>> for >>>>>>>>>>>>> each >>>>>>>>>>>>> >>>>>>>>>>>> row, >>>>>>>>>> >>>>>>>>>>> based on a window (range) of preceding and succeeding rows. >>>>>>>>>>>>> Each type of aggregate shall be supported on keyed/grouped or >>>>>>>>>>>>> non-keyed/grouped data streams for streaming tables as well as >>>>>>>>>>>>> batch >>>>>>>>>>>>> >>>>>>>>>>>> tables. >>>>>>>>>> >>>>>>>>>>> We are looking forward to your feedback. >>>>>>>>>>>>> >>>>>>>>>>>>> Timo >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Freundliche Grüße / Kind Regards >>>>>>>>>>> >>>>>>>>>>> Timo Walther >>>>>>>>>>> >>>>>>>>>>> Follow me: @twalthr >>>>>>>>>>> https://www.linkedin.com/in/twalthr >>>>>>>>>>> <https://www.linkedin.com/in/t >>>>>>>>>>> walthr> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> Freundliche Grüße / Kind Regards >>>>>> >>>>>> Timo Walther >>>>>> >>>>>> Follow me: @twalthr >>>>>> https://www.linkedin.com/in/twalthr >>>>>> >>>>>> >>>>> >>>>> >>>> >>> >>