Hi Zhangrucong, yes, we want to use Calcite's SQL parser including its window syntax, i.e.,
- the standard SQL OVER windows (in streaming with a few restriction such as no different partitionings or orders) - the GroupBy window functions (TUMBLE, HOP, SESSION). The GroupBy window function are not implemented in Calcite yet. There is CALCITE-1345 [1] to track the issue. As Shaoxuan mentioned, we are not using the STREAM keyword to be SQL compliant. Best, Fabian [1] https://issues.apache.org/jira/browse/CALCITE-1345 2016-10-13 12:05 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: > Hi everybody, > > happy to see a good discussion here :-) > I'll reply to Shaoxuan's mail first and comment on Zhangrucong question > in a separate mail. > > Shaoxuan, thanks for the suggestions! I think we all agree that for SQL we > should definitely follow the standard (batch) SQL syntax. > In my opinion, the Table API does not necessarily have to be as close as > possible to SQL but should try to make a few things easier and also safer > (easier is of course subjective). > > - GroupBy without windows: These are currently intentionally not supported > and also not part of FLIP-11. Our motivation for not supporting this, is to > guard the user from defining a query that fails when being executed due to > a very memory consuming operation. FLIP-11 provides a way to define such a > query as a sliding row window with unbounded preceding rows. With the > upcoming SQL proposal, queries that consume unbounded memory should be > identified and rejected. I would be in favor of allowing groupBy without > windows once the guarding mechanism are in place. > > - GroupBy with window: I think this is a question of taste. Having a > window() call, makes the feature more explicit in my opinion. However, I'm > not opposed to move the windows into the groupBy clause. > Implementation-wise it should be easy to move the window definition into to > groupBy clause for the Scala Table API. For the Java Table API we would > need to extend the parser quite a bit because windows would need to be > defined as Strings and not via objects. > > - RowWindows: The rowWindow() call mimics the standard SQL WINDOW clause > (implemented by PostgreSQL and Calcite) which allows to have "reusable" > window definitions. I think this is a desirable feature. In the FLIP-11 > proposal the over() clause in select() refers to the predefined windows > with aliases. In case only one window is defined, the over() clause is > optional and the same (and only) window is applied to all aggregates. I > think we can make the over() call mandatory to have the windowing more > explicit. It should also be possible to extend the over clause to directly > accept RowWindows instead of window aliases. I would not make this a > priority at the moment, but a feature that could be later added, because > rowWindow() and over() cover all cases. Similar as for GroupBy with > windows, we would need to extend the parser for the Java Table API though. > > Finally, I have an own suggestion: > In FLIP-11, groupBy() is used to define the partitioning of RowWindows. I > think this should be changed to partitionBy() because groupBy() groups data > and applies an aggregation to all rows of a group which is not happening > here. In original SQL, the OVER clause features a PARTITION BY clause. We > are moving this out of the window definition, i.e., OVER clause, to enforce > the same partitioning for all windows (different partitionings would be a > challenge to execute in a parallel system). > > @Timo and all: What do you think about: > > - moving windows into the groupBy() call > - making over() for rowWindow() with a single window definition > - additionally allowing window definitions in over() > - using partitionBy() instead of groupBy() for row windows? > > Best, Fabian > > 2016-10-13 11:10 GMT+02:00 Zhangrucong <zhangruc...@huawei.com>: > >> Hi shaoxuan: >> >> I think, the streamsql must be excuted in table environment. So I call >> this table API ‘s StreamSQL. What do you call for this, stream Table API or >> streamsql? It is fu >> >> val env = StreamExecutionEnvironment.getExecutionEnvironment >> val tblEnv = TableEnvironment.getTableEnvironment(env) >> val ds: DataStream[(String,Long, Long)] = env.readTextFile("/home/demo") >> tblEnv.registerDataStream("Order", ds, 'userID, 'count, 'num) >> .map(f=>(f, 1L, 1L)) >> val sql = tblEnv.sql("SELECT Stream * FROM Order WHERE userID='A'") >> >> So in my opinion, the grammar which is marked red should be compatible >> with calcite's StreamSQL grammar. >> >> By the way, thanks very much for telling me the modified content in >> Flink StreamSQL. I will look the new proposal . >> >> Thanks! >> 发件人: Sean Wang [mailto:wshaox...@gmail.com] >> 发送时间: 2016年10月13日 16:29 >> 收件人: dev@flink.apache.org; Zhangrucong >> 主题: Re: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations >> >> Hi zhangrucong, >> I am not sure what you mean by "table API'S StreamSQL", I guess you mean >> "stream TableAPI"? >> TableAPI should be compatible with calcite SQL. (By compatible, My >> understanding is that both TableAPI and SQL will be translated to the same >> logical plan - the same set of REL and REX). >> BTW, please note that we recently have merged a change to remove STREAM >> keyword for flink stream SQL(FLINK-4546). In our opinion, batch and stream >> are not necessarily to be differentiated at the SQL level. The major >> difference between batch and stream is "WHEN and HOW to emit the result". >> We have been working on a new proposal with Fabian on this change. I >> guess it will be sent out for review very soon. >> >> Regards, >> Shaoxuan >> >> >> On Thu, Oct 13, 2016 at 2:29 PM, Zhangrucong <zhangruc...@huawei.com >> <mailto:zhangruc...@huawei.com>> wrote: >> Hi shaoxuan: >> Does the table API'S StreamSQL grammar is compatible with calcite's >> StreamSQL grammar? >> >> >> 1、In calcite, the tumble window is realized by using function tumble or >> hop. And the function must be used with group by, like this: >> >> SELECT >> TUMBLE_END(rowtime, INTERVAL '30' MINUTE, TIME '0:12') AS rowtime, >> productId, >> COUNT(*) AS c, >> SUM(units) AS units >> FROM Orders >> GROUP BY TUMBLE(rowtime, INTERVAL '30' MINUTE, TIME '0:12'), >> productId; >> >> 2、 The sliding window uses keywords "window" and "over". Like this: >> >> SELECT * >> FROM ( >> SELECT STREAM rowtime, >> productId, >> units, >> AVG(units) OVER product (RANGE INTERVAL '10' MINUTE PRECEDING) AS m10, >> AVG(units) OVER product (RANGE INTERVAL '7' DAY PRECEDING) AS d7 >> FROM Orders >> WINDOW product AS ( >> ORDER BY rowtime >> PARTITION BY productId)) >> >> >> >> Thanks! >> >> -----邮件原件----- >> 发件人: 王绍翾(大沙) [mailto:shaoxuan....@alibaba-inc.com<mailto:shaoxuan.wsx@ali >> baba-inc.com>] >> 发送时间: 2016年10月13日 2:03 >> 收件人: dev@flink.apache.org<mailto:dev@flink.apache.org> >> 主题: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations >> >> Hi Fabian, Timo, and Jark.Thanks for kicking off this FLIP. This is a >> really great and promising proposal. I have a few comments to the "window" >> operator proposed in this FLIP (I am hoping it is not too late to bring up >> this). First, window is not always needed for the stream aggregation. There >> are cases where we want do an aggreation on a stream, while the query/emit >> strategy decides when to emit a streaming output. Second, window is needed >> when we want do an aggregation for a certain rage, but window is not an >> operator. We basically use window to define the range for aggregation. In >> tableAPI, a window should be defined together with "groupby" and "select" >> operators, either inside a "groupby" operator or after an "over" clause in >> "select" operator. This will make the TableAPI in the similar manner as SQL. >> For instance,[A groupby without window] >> <Table API> >> val res = tab >> .groupBy(‘a) >> .select(‘a, ‘b.sum) >> <SQL> >> SELECT a, SUM(b) >> FROM tab >> GROUP BY a >> [A tumble window inside groupby] >> <Table API>val res = tab >> .groupBy(‘a, tumble(10.minutes, ‘rowtime)) .select(‘a, ‘b.sum) >> <SQL>SELECT a, SUM(b)FROM tab GROUP BY a, TUMBLE(10.minutes , ‘rowtime) [A >> row tumble window after OVER] <Table API>.groupby('a) //optional >> .select(‘a, ‘b.count over rowTumble(10.minutes, ‘rowtime))<SQL>SELECT a, >> COUNT(b) OVER ROWTUMBLE(10.minutes, ‘rowtime)FROM tab GROUP BY a Please let >> me know what you think. >> Regards,Shaoxuan >> ------------------------------------------------------------------发件人:Fabian >> Hueske <fhue...@gmail.com<mailto:fhue...@gmail.com>>发送时间:2016年9月26日(星期一) >> 21:13收件人:dev@flink.apache.org<mailto:dev@flink.apache.org> < >> dev@flink.apache.org<mailto:dev@flink.apache.org>>主 题:Re: [DISCUSS] >> FLIP-11: Table API Stream Aggregations Hi everybody, >> >> Timo proposed our FLIP-11 a bit more than three weeks ago. >> I will update the status of the FLIP to accepted. >> >> Thanks, >> Fabian >> >> 2016-09-19 9:16 GMT+02:00 Timo Walther <twal...@apache.org<mailto:twa >> l...@apache.org>>: >> >> > Hi Jark, >> > >> > yes I think enough time has passed. We can start implementing the >> changes. >> > What do you think Fabian? >> > >> > If there are no objections, I will create the subtasks in Jira today. >> >For >> > FLIP-11/1 I already have implemented a prototype, I just have to do >> >some >> > refactoring/documentation before opening a PR. >> > >> > Timo >> > >> > >> > Am 18/09/16 um 04:46 schrieb Jark Wu: >> > >> > Hi all, >> >> >> >> It seems that there’s no objections to the window design. So could we >> >> open subtasks to start working on it now ? >> >> >> >> - Jark Wu >> >> >> >> 在 2016年9月7日,下午4:29,Jark Wu <wuchong...@alibaba-inc.com<mailto: >> wuchong...@alibaba-inc.com>> 写道: >> >>> >> >>> Hi Fabian, >> >>> >> >>> Thanks for sharing your ideas. >> >>> >> >>> They all make sense to me. Regarding to reassigning timestamp, I do >> >>>not >> >>> have an use case. I come up with this because DataStream has a >> >>> TimestampAssigner :) >> >>> >> >>> +1 for this FLIP. >> >>> >> >>> - Jark Wu >> >>> >> >>> 在 2016年9月7日,下午2:59,Fabian Hueske <fhue...@gmail.com<mailto:fhue >> s...@gmail.com> <mailto: >> >>>> fhue...@gmail.com<mailto:fhue...@gmail.com>>> 写道: >> >>>> >> >>>> Hi, >> >>>> >> >>>> thanks for your comments and questions! >> >>>> Actually, you are bringing up the points that Timo and I discussed >> >>>>the >> >>>> most >> >>>> when designing the FLIP ;-) >> >>>> >> >>>> - We also thought about the syntactic shortcut for running >> >>>>aggregates >> >>>> like >> >>>> you proposed (table.groupBy(‘a).select(…)). Our motivation to not >> >>>>allow >> >>>> this shortcut is to prevent users from accidentally performing a >> >>>> "dangerous" operation. The problem with unbounded sliding >> >>>>row-windows is >> >>>> that their state does never expire. If you have an evolving key >> >>>>space, >> >>>> you >> >>>> will likely run into problems at some point because the operator >> >>>>state >> >>>> grows too large. IMO, a row-window session is a better approach, >> >>>> because it >> >>>> defines a timeout after which state can be discarded. >> >>>>groupBy.select is >> >>>> a >> >>>> very common operation in batch but its semantics in streaming are >> >>>>very >> >>>> different. In my opinion it makes sense to make users aware of >> >>>>these >> >>>> differences through the API. >> >>>> >> >>>> - Reassigning timestamps and watermarks is a very delicate issue. >> >>>>You >> >>>> are >> >>>> right, that Calcite exposes this field which is necessary due to >> >>>>the >> >>>> semantics of SQL. However, also in Calcite you cannot freely choose >> >>>>the >> >>>> timestamp attribute for streaming queries (it must be a monotone or >> >>>> quasi-monotone attribute) which is hard to reason about (and >> >>>>guarantee) >> >>>> after a few operators have been applied. Streaming tables in Flink >> >>>>will >> >>>> likely have a time attribute which is identical to the initial >> rowtime. >> >>>> However, Flink does modify timestamps internally, e.g., for records >> >>>>that >> >>>> are emitted from time windows, in order to ensure that consecutive >> >>>> windows >> >>>> perform as expected. Modify or reassign timestamps in the middle of >> >>>>a >> >>>> job >> >>>> can result in unexpected results which are very hard to reason >> >>>>about. Do >> >>>> you have a concrete use case in mind for reassigning timestamps? >> >>>> >> >>>> - The idea to represent rowtime and systime as object is good. Our >> >>>> motivation to go for reserved Scala symbols was to have a uniform >> >>>>syntax >> >>>> with windows over streaming and batch tables. On batch tables you >> >>>>can >> >>>> compute time windows basically over every time attribute (they are >> >>>> treated >> >>>> similar to grouping attributes with a bit of extra logic to extract >> >>>>the >> >>>> grouping key for sliding and session windows). If you write >> >>>> window(Tumble >> >>>> over 10.minutes on 'rowtime) on a streaming table, 'rowtime would >> >>>> indicate >> >>>> event-time. On a batch table with a 'rowtime attribute, the same >> >>>> operator >> >>>> would be internally converted into a group by. By going for the >> >>>>object >> >>>> approach we would lose this compatibility (or would need to >> >>>>introduce an >> >>>> additional column attribute to specifiy the window attribute for >> >>>>batch >> >>>> tables). >> >>>> >> >>>> As usual some of the design decisions are based on preferences. >> >>>> Do they make sense to you? Let me know what you think. >> >>>> >> >>>> Best, Fabian >> >>>> >> >>>> >> >>>> 2016-09-07 5:12 GMT+02:00 Jark Wu <wuchong...@alibaba-inc.com<ma >> ilto:wuchong...@alibaba-inc.com> <mailto: >> >>>> wuchong...@alibaba-inc.com<mailto:wuchong...@alibaba-inc.com>>>: >> >>>> >> >>>> Hi all, >> >>>>> >> >>>>> I'm on vacation for about five days , sorry to have missed this >> >>>>>great >> >>>>> FLIP. >> >>>>> >> >>>>> Yes, the non-windowed aggregates is a special case of row-window. >> >>>>>And >> >>>>> the >> >>>>> proposal looks really good. Can we have a simplified form for the >> >>>>> special >> >>>>> case? Such as : table.groupBy(‘a).rowWindow(Sl >> >>>>> ideRows.unboundedPreceding).select(…) >> >>>>> can be simplified to table.groupBy(‘a).select(…). The latter will >> >>>>> actually >> >>>>> call the former. >> >>>>> >> >>>>> Another question is about the rowtime. As the FLIP said, >> >>>>>DataStream and >> >>>>> StreamTableSource is responsible to assign timestamps and >> >>>>>watermarks, >> >>>>> furthermore “rowtime” and “systemtime” are not real column. IMO, >> >>>>>it is >> >>>>> different with Calcite’s rowtime, which is a real column in the >> table. >> >>>>> In >> >>>>> FLIP's way, we will lose some flexibility. Because the timestamp >> >>>>> column may >> >>>>> be created after some transformations or join operation, not >> >>>>>created at >> >>>>> beginning. So why do we have to define rowtime at beginning? >> >>>>>(because >> >>>>> of >> >>>>> watermark?) Can we have a way to define rowtime after source >> >>>>>table >> >>>>> like >> >>>>> TimestampAssinger? >> >>>>> >> >>>>> Regarding to “allowLateness” method. I come up a trick that we can >> >>>>>make >> >>>>> ‘rowtime and ‘system to be a Scala object, not a symbol expression. >> >>>>> The API >> >>>>> will looks like this : >> >>>>> >> >>>>> window(Tumble over 10.minutes on rowtime allowLateness as ‘w) >> >>>>> >> >>>>> The implementation will look like this: >> >>>>> >> >>>>> class TumblingWindow(size: Expression) extends Window { >> >>>>> def on(time: rowtime.type): TumblingEventTimeWindow = >> >>>>> new TumblingEventTimeWindow(alias, ‘rowtime, size) // >> >>>>>has >> >>>>> allowLateness() method >> >>>>> >> >>>>> def on(time: systemtime.type): TumblingProcessingTimeWindow= >> >>>>> new TumblingProcessingTimeWindow(alias, ‘systemtime, size) >> >>>>> // hasn’t allowLateness() method >> >>>>> } >> >>>>> object rowtime >> >>>>> object systemtime >> >>>>> >> >>>>> What do you think about this? >> >>>>> >> >>>>> - Jark Wu >> >>>>> >> >>>>> 在 2016年9月6日,下午11:00,Timo Walther <twal...@apache.org<mailto:twa >> l...@apache.org> <mailto: >> >>>>>> twal...@apache.org<mailto:twal...@apache.org>>> 写道: >> >>>>>> >> >>>>>> Hi all, >> >>>>>> >> >>>>>> I thought about the API of the FLIP again. If we allow the >> >>>>>> "systemtime" >> >>>>>> >> >>>>> attribute, we cannot implement a nice method chaining where the >> >>>>>user >> >>>>> can >> >>>>> define a "allowLateness" only on event time. So even if the user >> >>>>> expressed >> >>>>> that "systemtime" is used we have to offer a "allowLateness" >> >>>>>method >> >>>>> because >> >>>>> we have to assume that this attribute can also be the batch event >> >>>>>time >> >>>>> column, which is not very nice. >> >>>>> >> >>>>>> class TumblingWindow(size: Expression) extends Window { >> >>>>>> def on(timeField: Expression): TumblingEventTimeWindow = >> >>>>>> new TumblingEventTimeWindow(alias, timeField, size) // has >> >>>>>> >> >>>>> allowLateness() method >> >>>>> >> >>>>>> } >> >>>>>> >> >>>>>> What do you think? >> >>>>>> >> >>>>>> Timo >> >>>>>> >> >>>>>> >> >>>>>> Am 05/09/16 um 10:41 schrieb Fabian Hueske: >> >>>>>> >> >>>>>>> Hi Jark, >> >>>>>>> >> >>>>>>> you had asked for non-windowed aggregates in the Table API a few >> >>>>>>> times. >> >>>>>>> FLIP-11 proposes row-window aggregates which are a >> >>>>>>>generalization of >> >>>>>>> running aggregates (SlideRow unboundedPreceding). >> >>>>>>> >> >>>>>>> Can you have a look at the FLIP and give feedback whether this >> >>>>>>>is >> >>>>>>> what >> >>>>>>> >> >>>>>> you >> >>>>> >> >>>>>> are looking for? >> >>>>>>> Improvement suggestions are very welcome as well. >> >>>>>>> >> >>>>>>> Thank you, >> >>>>>>> Fabian >> >>>>>>> >> >>>>>>> 2016-09-01 16<tel:2016-09-01%C2%A016>:12 GMT+02:00 Timo Walther < >> twal...@apache.org<mailto:twal...@apache.org> <mailto: >> >>>>>>> twal...@apache.org<mailto:twal...@apache.org>>>: >> >>>>>>> >> >>>>>>> Hi all! >> >>>>>>>> >> >>>>>>>> Fabian and I worked on a FLIP for Stream Aggregations in the >> >>>>>>>>Table >> >>>>>>>> API. >> >>>>>>>> You can find the FLIP-11 here: >> >>>>>>>> >> >>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%<h >> ttps://cwiki.apache.org/confluence/display/FLINK/FLIP-11%25> < >> >>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%<h >> ttps://cwiki.apache.org/confluence/display/FLINK/FLIP-11%25>> >> >>>>>>>> 3A+Table+API+Stream+Aggregations >> >>>>>>>> >> >>>>>>>> Motivation for the FLIP: >> >>>>>>>> >> >>>>>>>> The Table API is a declarative API to define queries on static >> >>>>>>>>and >> >>>>>>>> streaming tables. So far, only projection, selection, and union >> >>>>>>>>are >> >>>>>>>> supported operations on streaming tables. >> >>>>>>>> >> >>>>>>>> This FLIP proposes to add support for different types of >> >>>>>>>> aggregations >> >>>>>>>> >> >>>>>>> on >> >>>>> >> >>>>>> top of streaming tables. In particular, we seek to support: >> >>>>>>>> >> >>>>>>>> - Group-window aggregates, i.e., aggregates which are computed >> >>>>>>>>for a >> >>>>>>>> >> >>>>>>> group >> >>>>> >> >>>>>> of elements. A (time or row-count) window is required to bound >> >>>>>>the >> >>>>>>>> >> >>>>>>> infinite >> >>>>> >> >>>>>> input stream into a finite group. >> >>>>>>>> >> >>>>>>>> - Row-window aggregates, i.e., aggregates which are computed >> >>>>>>>>for >> >>>>>>>> each >> >>>>>>>> >> >>>>>>> row, >> >>>>> >> >>>>>> based on a window (range) of preceding and succeeding rows. >> >>>>>>>> Each type of aggregate shall be supported on keyed/grouped or >> >>>>>>>> non-keyed/grouped data streams for streaming tables as well as >> >>>>>>>>batch >> >>>>>>>> >> >>>>>>> tables. >> >>>>> >> >>>>>> We are looking forward to your feedback. >> >>>>>>>> >> >>>>>>>> Timo >> >>>>>>>> >> >>>>>>>> >> >>>>>> -- >> >>>>>> Freundliche Grüße / Kind Regards >> >>>>>> >> >>>>>> Timo Walther >> >>>>>> >> >>>>>> Follow me: @twalthr >> >>>>>> https://www.linkedin.com/in/twalthr >> >>>>>><https://www.linkedin.com/in/t >> >>>>>> walthr> >> >>>>>> >> >>>>> >> >>>>> >> >> >> > >> > -- >> > Freundliche Grüße / Kind Regards >> > >> > Timo Walther >> > >> > Follow me: @twalthr >> > https://www.linkedin.com/in/twalthr >> > >> > >> >> >