Hi all, I have completed a first implementation that works for the SQL query SELECT a, SUM(b) OVER (PARTITION BY c ORDER BY a RANGE BETWEEN 2 PRECEDING) AS sumB FROM MyTable
I have SUM, MAX, MIN, AVG, COUNT implemented but I could test it just on simple queries such as the one above. Is there any specific case I should be looking at? Regards, Stefano -----Original Message----- From: jincheng sun [mailto:sunjincheng...@gmail.com] Sent: Tuesday, February 28, 2017 12:26 PM To: dev@flink.apache.org Subject: Re: [DISCUSS] Table API / SQL indicators for event and processing time Hi everyone, thanks for sharing your thoughts. I really like Timo’s proposal, and I have a few thoughts want to share. We want to keep the query same for batch and streaming. IMO. “process time” is something special to dataStream while it is not a well defined term for batch query. So it is kind of free to create something new for processTime. I think it is a good idea to add a proctime as a reserved keyword for SQL. Regarding to “event time”, it is well defined for batch query. So IMO, we should keep the way of defining a streaming window exactly same as batch window. Therefore, the row for event time is nothing special, but just a normal column. The major difference between batch and stream is that in dataStream the event time column must be associated with a watermark function. I really like the way Timo proposed, that we can select any column as rowtime. But I think instead of just clarify a column is a rowtime (actually I do not think we need this special rowtime keyword), it is better to register/associate the waterMark function to this column when creating the table. For dataStream, we will validate a rowtime column only if it has been associated with the waterMark function. A prototype code to explain how it looks like is shown as below: TableAPI: toTable(tEnv, 'a, 'b, 'c) .registeredWatermarks('a, waterMarkFunction1) batchOrStreamTable .window(Tumble over 5.milli on 'a as 'w) .groupBy('w, 'b) .select('b, 'a.count as cnt1, 'c.sum as cnt2) SQL: addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c) .registeredWatermarks('a, waterMarkFunction1) SELECT a, SUM(b) OVER (PARTITION BY c ORDER BY a RANGE BETWEEN 2 PRECEDING) AS sumB FROM MyTable What do you think ? 2017-02-22 23:44 GMT+08:00 Timo Walther <twal...@apache.org>: > Hi everyone, > > I have create an issue [1] to track the progress of this topic. I have > written a little design document [2] how we could implement the > indicators and which parts have to be touched. I would suggest to > implement a prototype, also to see what is possible and can be > integrated both in Flink and Calcite. Feedback is welcome. > > Regards, > Timo > > [1] https://issues.apache.org/jira/browse/FLINK-5884 > [2] https://docs.google.com/document/d/1JRXm09x_wKst6z6UXdCGF9tg > F1ueOAsFiQwahR72vbc/edit?usp=sharing > > > > Am 21/02/17 um 15:06 schrieb Fabian Hueske: > > Hi Xingcan, >> >> thanks for your thoughts. >> In principle you are right that the monotone attribute property would >> be sufficient, however there are more aspects to consider than that. >> >> Flink is a parallel stream processor engine which means that data is >> processed in separate processes and shuffle across them. >> Maintaining a strict order when merging parallel streams would be >> prohibitively expensive. >> Flink's watermark mechanism helps operators to deal with out-of-order >> data (due to out-of-order input or shuffles). >> I don't think we can separate the discussion about time attributes >> from watermarks if we want to use Flink as a processing engine and >> not reimplement large parts from scratch. >> >> When transforming a time attribute, we have to either align it with >> existing watermarks or generate new watermarks. >> If we want to allow all kinds of monotone transformations, we have to >> adapt the watermarks which is not trivial. >> Instead, I think we should initially only allow very few monotone >> transformations which are aligned with the existing watermarks. We >> might later relax this condition if we see that users request this feature. >> >> You are right, that we need to track which attribute can be used as a >> time attribute (i.e., is increasing and guarded by watermarks). >> For that we need to expose the time attribute when a Table is created >> (either when a DataStream is converted like: stream.toTable(tEnv, 'a, >> 'b, >> 't.rowtime) or in a StreamTableSource) and track how it is used in >> queries. >> I am not sure if the monotone property would be the right choice >> here, since data is only quasi-monotone and a monotone annotation >> might trigger some invalid optimizations which change the semantics of a >> query. >> Right now, Calcite does not offer a quasi-monotone property (at least >> I haven't found it). >> >> Best, Fabian >> >> >> 2017-02-21 4:41 GMT+01:00 Xingcan Cui <xingc...@gmail.com>: >> >> Hi all, >>> >>> As I said in another thread, the main difference between stream and >>> table is that a stream is an ordered list while a table is an unordered set. >>> >>> Without considering the out-of-order problem in practice, whether >>> event-time or processing-time can be just taken as a monotonically >>> increasing field and that's why the given query[1] would work. In >>> other words, we must guarantee the "SELECT MAX(t22.rowtime) ..." >>> subquery returns a single value that can be retrieved from the >>> cached dynamic table since it's dangerous to join two un-windowed >>> streams. >>> >>> Under this circumstance, I just consider adding a "monotonic >>> hint"(INC or >>> DEC) to the field of a (generalized) table (maybe using an >>> annotation on the registerDataXX method) that can be used to >>> indicate whether a field is monotonically increasing or decreasing. >>> Then by taking rowtime as common (monotonically increasing) field, >>> there are several benefits: >>> >>> 1) This can uniform the table and stream by importing total ordering >>> relation to an unordered set. >>> >>> 2) These fields can be modified arbitrarily as long as they keep the >>> declared monotonic feature and the watermark problem does not exist >>> any more. >>> >>> 3) The monotonic hint will be useful in the query optimization process. >>> >>> What do you think? >>> >>> Best, >>> Xingcan >>> >>> [1] >>> SELECT t1.amount, t2.rate >>> FROM >>> table1 AS t1, >>> table2 AS t2 >>> WHERE >>> t1.currency = t2.currency AND >>> t2.rowtime = ( >>> SELECT MAX(t22.rowtime) >>> FROM table2 AS t22 >>> AND t22.rowtime <= t1.rowtime) >>> >>> On Tue, Feb 21, 2017 at 2:52 AM, Fabian Hueske <fhue...@gmail.com> >>> wrote: >>> >>> Hi everybody, >>>> >>>> When Timo wrote to the Calcite mailing list, Julian Hyde replied >>>> and gave good advice and explained why a system attribute for >>>> event-time would be >>>> >>> a >>> >>>> problem [1]. >>>> I thought about this and agree with Julian. >>>> >>>> Here is a document to describe the problem, constraints in Flink >>>> and a proposal how to handle processing time and event time in >>>> Table API and >>>> >>> SQL: >>> >>>> -> >>>> https://docs.google.com/document/d/1MDGViWA_ >>>> >>> TCqpaVoWub7u_GY4PMFSbT8TuaNl- >>> >>>> EpbTHQ >>>> >>>> Please have a look, comment and ask questions. >>>> >>>> Thank you, >>>> Fabian >>>> >>>> [1] >>>> https://lists.apache.org/thread.html/6397caf0ca37f97f2cd27d96f7a12c >>>> 6fa845d6fd0870214fdce18d96@%3Cdev.calcite.apache.org%3E >>>> >>>> 2017-02-16 1:18 GMT+01:00 Fabian Hueske <fhue...@gmail.com>: >>>> >>>> Thanks everybody for the comments. >>>>> >>>>> Actually, I think we do not have much choice when deciding whether >>>>> to >>>>> >>>> use >>> >>>> attributes or functions. >>>>> Consider the following join query: >>>>> >>>>> SELECT t1.amount, t2.rate >>>>> FROM >>>>> table1 AS t1, >>>>> table2 AS t2 >>>>> WHERE >>>>> t1.currency = t2.currency AND >>>>> t2.rowtime = ( >>>>> SELECT MAX(t22.rowtime) >>>>> FROM table2 AS t22 >>>>> AND t22.rowtime <= t1.rowtime) >>>>> >>>>> The query joins two streaming tables. Table 1 is a streaming table >>>>> with amounts in a certain currency. Table 2 is a (slowly changing) >>>>> streaming table of currency exchange rates. >>>>> We want to join the amounts stream with the exchange rate of the >>>>> corresponding currency that is valid (i.e., last received value -> >>>>> MAX(rowtime)) at the rowtime of the amounts row. >>>>> In order to specify the query, we need to refer to the rowtime of >>>>> the different tables. Hence, we need a way to relate the rowtime >>>>> expression >>>>> >>>> (or >>>> >>>>> marker) to a table. >>>>> This is not possible with a parameterless scalar function. >>>>> >>>>> I'd like to comment on the concerns regarding the performance: >>>>> In fact, the columns could be completely virtual and only exist >>>>> during query parsing and validation. >>>>> During execution, we can directly access the rowtime metadata of a >>>>> >>>> Flink >>> >>>> streaming record (which is present anyway) or look up the current >>>>> processing time from the machine clock. So the processing overhead >>>>> >>>> would >>> >>>> actually be the same as with a marker function. >>>>> >>>>> Regarding the question on what should be allowed with a system >>>>> >>>> attribute: >>> >>>> IMO, it could be used as any other attribute. We need it at least >>>> in >>>>> >>>> GROUP >>>> >>>>> BY, ORDER BY, and WHERE to define windows and joins. We could also >>>>> >>>> allow >>> >>>> to >>>> >>>>> access it in SELECT if we want users to give access to rowtime and >>>>> processing time. So @Haohui, your query could be supported. >>>>> However, what would not be allowed is to modify the value of the >>>>> rows, i.e., by naming another column rowtime, i.e., "SELECT >>>>> sometimestamp AS rowtime" would not be allowed, because Flink does >>>>> not support to modify >>>>> >>>> the >>>> >>>>> event time of a row (for good reasons) and processing time should >>>>> not >>>>> >>>> be >>> >>>> modifiable anyway. >>>>> >>>>> @Timo: >>>>> I think the approach to only use the system columns during parsing >>>>> and validation and converting them to expressions afterwards makes >>>>> a lot of sense. >>>>> The question is how this approach could be nicely integrated with >>>>> >>>> Calcite. >>>> >>>>> Best, Fabian >>>>> >>>>> >>>>> >>>>> 2017-02-15 16:50 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>: >>>>> >>>>> Hi, >>>>>> >>>>>> My initial thought would be that it makes more sense to thave >>>>>> >>>>> procTime() >>> >>>> and rowTime() only as functions which in fact are to be used as >>>>>> >>>>> markers. >>> >>>> Having the value (even from special system attributes does not make >>>>>> >>>>> sense >>>> >>>>> in some scenario such as the ones for creating windows, e.g., >>>>>> If you have SELECT Count(*) OVER (ORDER BY procTime()...) If you >>>>>> get the value of procTime you cannot do anything as you need >>>>>> >>>>> the >>> >>>> marker to know how to construct the window logic. >>>>>> >>>>>> However, your final idea of having " implement some rule/logic >>>>>> that translates the attributes to special RexNodes internally " I >>>>>> believe >>>>>> >>>>> is >>> >>>> good and gives a solution to both problems. One the one hand for >>>> those >>>>>> scenarios where you need the value you can access the value, >>>>>> while for others you can see the special type of the RexNode and >>>>>> use it as a >>>>>> >>>>> marker. >>>> >>>>> Regarding keeping this data in a table...i am not sure as you >>>>> would >>>>>> >>>>> say >>> >>>> we need to augment the data with two fields whether needed or >>>>>> >>>>> not...this >>>> >>>>> is nto necessary very efficient >>>>>> >>>>>> >>>>>> Dr. Radu Tudoran >>>>>> Senior Research Engineer - Big Data Expert IT R&D Division >>>>>> >>>>>> >>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH >>>>>> European Research Center >>>>>> Riesstrasse 25, 80992 München >>>>>> >>>>>> E-mail: radu.tudo...@huawei.com >>>>>> Mobile: +49 15209084330 >>>>>> Telephone: +49 891588344173 >>>>>> >>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH >>>>>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com >>>>>> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, >>>>>> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN >>>>>> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, >>>>>> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN >>>>>> This e-mail and its attachments contain confidential information from >>>>>> HUAWEI, which is intended only for the person or entity whose address >>>>>> >>>>> is >>> >>>> listed above. Any use of the information contained herein in any way >>>>>> (including, but not limited to, total or partial disclosure, >>>>>> >>>>> reproduction, >>>> >>>>> or dissemination) by persons other than the intended recipient(s) is >>>>>> prohibited. If you receive this e-mail in error, please notify the >>>>>> >>>>> sender >>>> >>>>> by phone or email immediately and delete it! >>>>>> >>>>>> -----Original Message----- >>>>>> From: Timo Walther [mailto:twal...@apache.org] >>>>>> Sent: Wednesday, February 15, 2017 9:33 AM >>>>>> To: dev@flink.apache.org >>>>>> Subject: Re: [DISCUSS] Table API / SQL indicators for event and >>>>>> processing time >>>>>> >>>>>> Hi all, >>>>>> >>>>>> at first I also thought that built-in functions (rowtime() and >>>>>> proctime()) are the easiest solution. However, I think to be >>>>>> >>>>> future-proof >>>> >>>>> we should make them system attributes; esp. to relate them to a >>>>>> corresponding table in case of multiple tables. Logically they are >>>>>> attributes of each row, which is already done in Table API. >>>>>> >>>>>> I will ask on the Calcite ML if there is a good way for integrating >>>>>> system attributes. Right now, I would propose the following >>>>>> >>>>> implementation: >>>> >>>>> - we introduce a custom row type (extending RelDataType) >>>>>> - in a streaming environment every row has two attributes by default >>>>>> (rowtime and proctime) >>>>>> - we do not allow creating a row type with those attributes (this >>>>>> >>>>> should >>> >>>> already prevent `SELECT field AS rowtime FROM ...`) >>>>>> - we need to ensure that these attributes are not part of expansion >>>>>> >>>>> like >>> >>>> `SELECT * FROM ...` >>>>>> - implement some rule/logic that translates the attributes to special >>>>>> RexNodes internally, such that the opimizer does not modify these >>>>>> >>>>> attributes >>>> >>>>> What do you think? >>>>>> >>>>>> Regards, >>>>>> Timo >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> Am 15/02/17 um 03:36 schrieb Xingcan Cui: >>>>>> >>>>>>> Hi all, >>>>>>> >>>>>>> thanks for this thread. >>>>>>> >>>>>>> @Fabian If I didn't miss the point, the main difference between the >>>>>>> two approaches is whether or not taking these time attributes as >>>>>>> common table fields that are directly available to users. Whatever, >>>>>>> these time attributes should be attached to records (right?), and >>>>>>> >>>>>> the >>> >>>> discussion lies in whether give them public qualifiers like other >>>>>>> common fields or private qualifiers and related get/set methods. >>>>>>> >>>>>>> The former (system attributes) approach will be more compatible with >>>>>>> existing SQL read-only operations (e.g., select, join), but we need >>>>>>> >>>>>> to >>> >>>> add restrictions on SQL modification operation (like what?). I think >>>>>>> there are no needs to forbid users modifying these attributes via >>>>>>> table APIs (like map function). Just inform them about these special >>>>>>> attribute names like system built in aggregator names in iteration. >>>>>>> >>>>>>> As for the built in function approach, I don't know if, for now, >>>>>>> >>>>>> there >>> >>>> are functions applied on a single row (maybe the value access >>>>>>> functions like COMPOSITE.get(STRING)?). It seems that most of the >>>>>>> built in functions work for a single field or on columns and thus it >>>>>>> will be mountains of work if we want to add a new kind of function >>>>>>> >>>>>> to >>> >>>> SQL. Maybe all existing operations should be modified to support it. >>>>>>> >>>>>>> All in all, if there are existing supports for single row function, >>>>>>> >>>>>> I >>> >>>> prefer the built in function approach. Otherwise the system >>>>>>> >>>>>> attributes >>> >>>> approach should be better. After all there are not so much >>>>>>> modification operations in SQL and maybe we can use alias to support >>>>>>> time attributes setting (just hypothesis, not sure if it's >>>>>>> >>>>>> feasible). >>> >>>> @Haohui I think the given query is valid if we add a aggregate >>>>>>> function to (PROCTIME() >>>>>>> - ROWTIME()) / 1000 and it should be executed efficiently. >>>>>>> >>>>>>> Best, >>>>>>> Xingcan >>>>>>> >>>>>>> On Wed, Feb 15, 2017 at 6:17 AM, Haohui Mai <ricet...@gmail.com> >>>>>>> >>>>>> wrote: >>>> >>>>> Hi, >>>>>>>> >>>>>>>> Thanks for starting the discussion. I can see there are multiple >>>>>>>> trade-offs in these two approaches. One question I have is that to >>>>>>>> which extent Flink wants to open its APIs to allow users to access >>>>>>>> both processing and event time. >>>>>>>> >>>>>>>> Before we talk about joins, my understanding for the two approaches >>>>>>>> that you mentioned are essentially (1) treating the value of event >>>>>>>> >>>>>>> / >>> >>>> processing time as first-class fields for each row, (2) limiting >>>>>>>> >>>>>>> the >>> >>>> scope of time indicators to only specifying windows. Take the >>>>>>>> following query as an >>>>>>>> example: >>>>>>>> >>>>>>>> SELECT (PROCTIME() - ROWTIME()) / 1000 AS latency FROM table GROUP >>>>>>>> >>>>>>> BY >>> >>>> FLOOR(PROCTIME() TO MINUTES) >>>>>>>> >>>>>>>> There are several questions we can ask: >>>>>>>> >>>>>>>> (1) Is it a valid query? >>>>>>>> (2) How efficient the query will be? >>>>>>>> >>>>>>>> For this query I can see arguments from both sides. I think at the >>>>>>>> end of the day it really comes down to what Flink wants to support. >>>>>>>> After working on FLINK-5624 I'm more inclined to support the second >>>>>>>> approach (i.e., built-in functions). The main reason why is that >>>>>>>> >>>>>>> the >>> >>>> APIs of Flink are designed to separate times from the real >>>>>>>> >>>>>>> payloads. >>> >>>> It probably makes sense for the Table / SQL APIs to have the same >>>>>>>> >>>>>>> designs. >>>>>> >>>>>>> For joins I don't have a clear answer on top of my head. Flink >>>>>>>> requires two streams to be put in the same window before doing the >>>>>>>> joins. This is essentially a subset of what SQL can express. I >>>>>>>> >>>>>>> don't >>> >>>> know what would be the best approach here. >>>>>>>> >>>>>>>> Regards, >>>>>>>> Haohui >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Feb 14, 2017 at 12:26 AM Fabian Hueske <fhue...@gmail.com> >>>>>>>> >>>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>>>> >>>>>>>>> It would as in the query I gave as an example before: >>>>>>>>> >>>>>>>>> SELECT >>>>>>>>> a, >>>>>>>>> SUM(b) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2 >>>>>>>>> PRECEDING AND CURRENT ROW) AS sumB, FROM myStream >>>>>>>>> >>>>>>>>> Here "proctime" would be a system attribute of the table >>>>>>>>> >>>>>>>> "myStream". >>> >>>> The table would also have another system attribute called >>>>>>>>> >>>>>>>> "rowtime" >>> >>>> which would be used to indicate event time semantics. >>>>>>>>> These attributes would always be present in tables which are >>>>>>>>> >>>>>>>> derived >>> >>>> from streams. >>>>>>>>> Because we still require that streams have timestamps and >>>>>>>>> >>>>>>>> watermarks >>> >>>> assigned (either by the StreamTableSource or the somewhere >>>>>>>>> downstream the DataStream program) when they are converted into a >>>>>>>>> table, there is no >>>>>>>>> >>>>>>>> need >>>>>>>> >>>>>>>>> to register anything. >>>>>>>>> >>>>>>>>> Does that answer your questions? >>>>>>>>> >>>>>>>>> Best, Fabian >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> 2017-02-14 2:04 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>: >>>>>>>>> >>>>>>>>> Hi Fabian, >>>>>>>>>> >>>>>>>>>> Thanks for starting the discussion. Before I give my thoughts on >>>>>>>>>> this >>>>>>>>>> >>>>>>>>> can >>>>>>>> >>>>>>>>> you please give some examples of how would you see option of >>>>>>>>>> >>>>>>>>> using >>> >>>> "system >>>>>>>>> >>>>>>>>>> attributes"? >>>>>>>>>> Do you use this when you register the stream as a table, do you >>>>>>>>>> >>>>>>>>> use >>> >>>> if when you call an SQL query, do you use it when you translate >>>>>>>>>> back a >>>>>>>>>> >>>>>>>>> table >>>>>>>> >>>>>>>>> to a stream / write it to a dynamic table? >>>>>>>>>> >>>>>>>>>> Dr. Radu Tudoran >>>>>>>>>> Senior Research Engineer - Big Data Expert IT R&D Division >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH >>>>>>>>>> European Research Center >>>>>>>>>> Riesstrasse 25, 80992 München >>>>>>>>>> >>>>>>>>>> E-mail: radu.tudo...@huawei.com >>>>>>>>>> Mobile: +49 15209084330 <+49%201520%209084330> >>>>>>>>>> Telephone: +49 891588344173 <+49%2089%201588344173> >>>>>>>>>> >>>>>>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH >>>>>>>>>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com >>>>>>>>>> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB >>>>>>>>>> >>>>>>>>> 56063, >>>> >>>>> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN >>>>>>>>>> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB >>>>>>>>>> >>>>>>>>> 56063, >>>> >>>>> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN >>>>>>>>>> This e-mail and its attachments contain confidential information >>>>>>>>>> >>>>>>>>> from >>>> >>>>> HUAWEI, which is intended only for the person or entity whose >>>>>>>>>> >>>>>>>>> address >>>> >>>>> is >>>>>>>> >>>>>>>>> listed above. Any use of the information contained herein in any >>>>>>>>>> >>>>>>>>> way >>>> >>>>> (including, but not limited to, total or partial disclosure, >>>>>>>>>> >>>>>>>>> reproduction, >>>>>>>>> >>>>>>>>>> or dissemination) by persons other than the intended recipient(s) >>>>>>>>>> >>>>>>>>> is >>>> >>>>> prohibited. If you receive this e-mail in error, please notify >>>>>>>>>> >>>>>>>>> the >>> >>>> sender >>>>>>>> >>>>>>>>> by phone or email immediately and delete it! >>>>>>>>>> >>>>>>>>>> -----Original Message----- >>>>>>>>>> From: Fabian Hueske [mailto:fhue...@gmail.com] >>>>>>>>>> Sent: Tuesday, February 14, 2017 1:01 AM >>>>>>>>>> To: dev@flink.apache.org >>>>>>>>>> Subject: [DISCUSS] Table API / SQL indicators for event and >>>>>>>>>> >>>>>>>>> processing >>>>>> >>>>>>> time >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> I'd like to start an discussion about how Table API / SQL queries >>>>>>>>>> >>>>>>>>> indicate >>>>>>>>> >>>>>>>>>> whether an operation is done in event or processing time. >>>>>>>>>> >>>>>>>>>> 1) Why do we need to indicate the time mode? >>>>>>>>>> >>>>>>>>>> We need to distinguish event time and processing time mode for >>>>>>>>>> >>>>>>>>> operations >>>>>>>> >>>>>>>>> in queries in order to have the semantics of a query fully >>>>>>>>>> >>>>>>>>> defined. >>> >>>> This cannot be globally done in the TableEnvironment because some >>>>>>>>>> >>>>>>>>> queries >>>>>>>> >>>>>>>>> explicitly request an expression such as the ORDER BY clause of >>>>>>>>>> >>>>>>>>> an >>> >>>> OVER >>>>>> >>>>>>> window with PRECEDING / FOLLOWING clauses. >>>>>>>>>> So we need a way to specify something like the following query: >>>>>>>>>> >>>>>>>>>> SELECT >>>>>>>>>> a, >>>>>>>>>> SUM(b) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2 >>>>>>>>>> >>>>>>>>> PRECEDING >>>>>>>> >>>>>>>>> AND CURRENT ROW) AS sumB, FROM myStream >>>>>>>>>> >>>>>>>>>> where "proctime" indicates processing time. Equivalently >>>>>>>>>> >>>>>>>>> "rowtime" >>> >>>> would >>>>>>>> >>>>>>>>> indicate event time. >>>>>>>>>> >>>>>>>>>> 2) Current state >>>>>>>>>> >>>>>>>>>> The current master branch implements time support only for >>>>>>>>>> >>>>>>>>> grouping >>> >>>> windows in the Table API. >>>>>>>>>> Internally, the Table API converts a 'rowtime symbol (which looks >>>>>>>>>> >>>>>>>>> like >>>>>> >>>>>>> a >>>>>>>> >>>>>>>>> regular attribute) into a special expression which indicates >>>>>>>>>> >>>>>>>>> event-time. >>>>>>>> >>>>>>>>> For example: >>>>>>>>>> >>>>>>>>>> table >>>>>>>>>> .window(Tumble over 5.milli on 'rowtime as 'w) >>>>>>>>>> .groupBy('a, 'w) >>>>>>>>>> .select(...) >>>>>>>>>> >>>>>>>>>> defines a tumbling event-time window. >>>>>>>>>> >>>>>>>>>> Processing-time is indicated by omitting a time attribute >>>>>>>>>> (table.window(Tumble over 5.milli as 'w) ). >>>>>>>>>> >>>>>>>>>> 3) How can we do that in SQL? >>>>>>>>>> >>>>>>>>>> In SQL we cannot add special expressions without touching the >>>>>>>>>> >>>>>>>>> parser >>>> >>>>> which >>>>>>>>> >>>>>>>>>> we don't want to do because we want to stick to the SQL standard. >>>>>>>>>> Therefore, I see only two options: adding system attributes or >>>>>>>>>> (parameterless) built-in functions. I list some pros and cons of >>>>>>>>>> >>>>>>>>> the >>>> >>>>> approaches below: >>>>>>>>>> >>>>>>>>>> 1. System Attributes: >>>>>>>>>> + most natural way to access a property of a record. >>>>>>>>>> + works with joins, because time attributes can be related to >>>>>>>>>> >>>>>>>>> tables >>>> >>>>> - We need to ensure the attributes are not writable and always >>>>>>>>>> >>>>>>>>> present >>>>>> >>>>>>> in >>>>>>>> >>>>>>>>> streaming tables (i.e., they should be system defined >>>>>>>>>> >>>>>>>>> attributes). >>> >>>> - Need to adapt existing Table API expressions (will not change >>>>>>>>>> >>>>>>>>> the >>> >>>> API >>>>>> >>>>>>> but some parts of the internal translation) >>>>>>>>>> - Event time value must be set when the stream is converted, >>>>>>>>>> >>>>>>>>> processing >>>>>> >>>>>>> time is evaluated on the fly >>>>>>>>>> >>>>>>>>>> 2. Built-in Functions >>>>>>>>>> + Users could try to modify time attributes which is not possible >>>>>>>>>> >>>>>>>>> with >>>>>> >>>>>>> functions >>>>>>>>>> - do not work with joins, because we need to address different >>>>>>>>>> >>>>>>>>> relations >>>>>>>> >>>>>>>>> - not a natural way to access a property of a record >>>>>>>>>> >>>>>>>>>> I think the only viable choice are system attributes, because >>>>>>>>>> >>>>>>>>> built-in >>>>>> >>>>>>> functions cannot be used for joins. >>>>>>>>>> However, system attributes are the more complex solution because >>>>>>>>>> >>>>>>>>> they >>>> >>>>> need >>>>>>>>> >>>>>>>>>> a better integration with Calcite's SQL validator (preventing >>>>>>>>>> >>>>>>>>> user >>> >>>> attributes which are named rowtime for instance). >>>>>>>>>> >>>>>>>>>> Since there are currently a several contributions on the way >>>>>>>>>> >>>>>>>>> (such >>> >>>> as >>>> >>>>> SQL >>>>>>>> >>>>>>>>> OVER windows FLINK-5653 to FLINK-5658) that need time indicators, >>>>>>>>>> >>>>>>>>> we >>>> >>>>> need a >>>>>>>>> >>>>>>>>>> solution soon to be able to make progress. >>>>>>>>>> There are two PRs, #3252 and #3271, which implement the built-in >>>>>>>>>> >>>>>>>>> marker >>>>>> >>>>>>> functions proctime() and rowtime() and which could serve as a >>>>>>>>>> >>>>>>>>> temporary >>>>>> >>>>>>> solution (since we do not work on joins yet). >>>>>>>>>> I would like to suggest to use these functions as a starting >>>>>>>>>> >>>>>>>>> point >>> >>>> (once >>>>>>>> >>>>>>>>> the PRs are merged) and later change to the system attribute >>>>>>>>>> >>>>>>>>> solution >>>> >>>>> which >>>>>>>>> >>>>>>>>>> needs a bit more time to be implemented. >>>>>>>>>> >>>>>>>>>> I talked with Timo today about this issue and he said he would >>>>>>>>>> >>>>>>>>> like >>> >>>> to >>>>>> >>>>>>> investigate how we can implement this as system functions >>>>>>>>>> >>>>>>>>> properly >>> >>>> integrated with Calcite and the SQL Validator. >>>>>>>>>> >>>>>>>>>> What do others think? >>>>>>>>>> >>>>>>>>>> Best, Fabian >>>>>>>>>> >>>>>>>>>> >>>>>> >