Hi Jincheng Sun, registering watermark functions for different attributes to allow each of them to be used in a window is an interesting idea.
However, watermarks only work well if the streaming data is (almost) in timestamp order. Since it is not possible to sort a stream, all attributes that would qualify as event-time attributes need to be in almost the same order. I think this limits the benefits of having multiple watermark functions quite significantly. But maybe you have a good use case that you can share where multiple event-time attributes would work well. So far our approach has been that a DataStream which is converted into a Table has already timestamps and watermarks assigned. We also assumed that a StreamTableSource would provide watermarks and timestamps and indicate the name of the attribute that carries the timestamp. @Stefano: That's great news. I'd suggest to open a pull request and have a look at PR #3397 which handles the (partitioned) unbounded case. Would be good to share some code between these approaches. Thanks, Fabian 2017-02-28 18:17 GMT+01:00 Stefano Bortoli <stefano.bort...@huawei.com>: > Hi all, > > I have completed a first implementation that works for the SQL query > SELECT a, SUM(b) OVER (PARTITION BY c ORDER BY a RANGE BETWEEN 2 > PRECEDING) AS sumB FROM MyTable > > I have SUM, MAX, MIN, AVG, COUNT implemented but I could test it just on > simple queries such as the one above. Is there any specific case I should > be looking at? > > Regards, > Stefano > > -----Original Message----- > From: jincheng sun [mailto:sunjincheng...@gmail.com] > Sent: Tuesday, February 28, 2017 12:26 PM > To: dev@flink.apache.org > Subject: Re: [DISCUSS] Table API / SQL indicators for event and processing > time > > Hi everyone, thanks for sharing your thoughts. I really like Timo’s > proposal, and I have a few thoughts want to share. > > We want to keep the query same for batch and streaming. IMO. “process time” > is something special to dataStream while it is not a well defined term for > batch query. So it is kind of free to create something new for processTime. > I think it is a good idea to add a proctime as a reserved keyword for SQL. > > Regarding to “event time”, it is well defined for batch query. So IMO, we > should keep the way of defining a streaming window exactly same as batch > window. Therefore, the row for event time is nothing special, but just a > normal column. The major difference between batch and stream is that in > dataStream the event time column must be associated with a watermark > function. I really like the way Timo proposed, that we can select any > column as rowtime. But I think instead of just clarify a column is a > rowtime (actually I do not think we need this special rowtime keyword), it > is better to register/associate the waterMark function to this column when > creating the table. For dataStream, we will validate a rowtime column only > if it has been associated with the waterMark function. A prototype code to > explain how it looks like is shown as below: > > TableAPI: > toTable(tEnv, 'a, 'b, 'c) > .registeredWatermarks('a, waterMarkFunction1) > > batchOrStreamTable > .window(Tumble over 5.milli on 'a as 'w) > .groupBy('w, 'b) > .select('b, 'a.count as cnt1, 'c.sum as cnt2) > > SQL: > addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c) > .registeredWatermarks('a, waterMarkFunction1) > > SELECT a, SUM(b) OVER (PARTITION BY c ORDER BY a RANGE BETWEEN 2 > PRECEDING) AS sumB FROM MyTable > > What do you think ? > > 2017-02-22 23:44 GMT+08:00 Timo Walther <twal...@apache.org>: > > > Hi everyone, > > > > I have create an issue [1] to track the progress of this topic. I have > > written a little design document [2] how we could implement the > > indicators and which parts have to be touched. I would suggest to > > implement a prototype, also to see what is possible and can be > > integrated both in Flink and Calcite. Feedback is welcome. > > > > Regards, > > Timo > > > > [1] https://issues.apache.org/jira/browse/FLINK-5884 > > [2] https://docs.google.com/document/d/1JRXm09x_wKst6z6UXdCGF9tg > > F1ueOAsFiQwahR72vbc/edit?usp=sharing > > > > > > > > Am 21/02/17 um 15:06 schrieb Fabian Hueske: > > > > Hi Xingcan, > >> > >> thanks for your thoughts. > >> In principle you are right that the monotone attribute property would > >> be sufficient, however there are more aspects to consider than that. > >> > >> Flink is a parallel stream processor engine which means that data is > >> processed in separate processes and shuffle across them. > >> Maintaining a strict order when merging parallel streams would be > >> prohibitively expensive. > >> Flink's watermark mechanism helps operators to deal with out-of-order > >> data (due to out-of-order input or shuffles). > >> I don't think we can separate the discussion about time attributes > >> from watermarks if we want to use Flink as a processing engine and > >> not reimplement large parts from scratch. > >> > >> When transforming a time attribute, we have to either align it with > >> existing watermarks or generate new watermarks. > >> If we want to allow all kinds of monotone transformations, we have to > >> adapt the watermarks which is not trivial. > >> Instead, I think we should initially only allow very few monotone > >> transformations which are aligned with the existing watermarks. We > >> might later relax this condition if we see that users request this > feature. > >> > >> You are right, that we need to track which attribute can be used as a > >> time attribute (i.e., is increasing and guarded by watermarks). > >> For that we need to expose the time attribute when a Table is created > >> (either when a DataStream is converted like: stream.toTable(tEnv, 'a, > >> 'b, > >> 't.rowtime) or in a StreamTableSource) and track how it is used in > >> queries. > >> I am not sure if the monotone property would be the right choice > >> here, since data is only quasi-monotone and a monotone annotation > >> might trigger some invalid optimizations which change the semantics of > a query. > >> Right now, Calcite does not offer a quasi-monotone property (at least > >> I haven't found it). > >> > >> Best, Fabian > >> > >> > >> 2017-02-21 4:41 GMT+01:00 Xingcan Cui <xingc...@gmail.com>: > >> > >> Hi all, > >>> > >>> As I said in another thread, the main difference between stream and > >>> table is that a stream is an ordered list while a table is an > unordered set. > >>> > >>> Without considering the out-of-order problem in practice, whether > >>> event-time or processing-time can be just taken as a monotonically > >>> increasing field and that's why the given query[1] would work. In > >>> other words, we must guarantee the "SELECT MAX(t22.rowtime) ..." > >>> subquery returns a single value that can be retrieved from the > >>> cached dynamic table since it's dangerous to join two un-windowed > >>> streams. > >>> > >>> Under this circumstance, I just consider adding a "monotonic > >>> hint"(INC or > >>> DEC) to the field of a (generalized) table (maybe using an > >>> annotation on the registerDataXX method) that can be used to > >>> indicate whether a field is monotonically increasing or decreasing. > >>> Then by taking rowtime as common (monotonically increasing) field, > >>> there are several benefits: > >>> > >>> 1) This can uniform the table and stream by importing total ordering > >>> relation to an unordered set. > >>> > >>> 2) These fields can be modified arbitrarily as long as they keep the > >>> declared monotonic feature and the watermark problem does not exist > >>> any more. > >>> > >>> 3) The monotonic hint will be useful in the query optimization process. > >>> > >>> What do you think? > >>> > >>> Best, > >>> Xingcan > >>> > >>> [1] > >>> SELECT t1.amount, t2.rate > >>> FROM > >>> table1 AS t1, > >>> table2 AS t2 > >>> WHERE > >>> t1.currency = t2.currency AND > >>> t2.rowtime = ( > >>> SELECT MAX(t22.rowtime) > >>> FROM table2 AS t22 > >>> AND t22.rowtime <= t1.rowtime) > >>> > >>> On Tue, Feb 21, 2017 at 2:52 AM, Fabian Hueske <fhue...@gmail.com> > >>> wrote: > >>> > >>> Hi everybody, > >>>> > >>>> When Timo wrote to the Calcite mailing list, Julian Hyde replied > >>>> and gave good advice and explained why a system attribute for > >>>> event-time would be > >>>> > >>> a > >>> > >>>> problem [1]. > >>>> I thought about this and agree with Julian. > >>>> > >>>> Here is a document to describe the problem, constraints in Flink > >>>> and a proposal how to handle processing time and event time in > >>>> Table API and > >>>> > >>> SQL: > >>> > >>>> -> > >>>> https://docs.google.com/document/d/1MDGViWA_ > >>>> > >>> TCqpaVoWub7u_GY4PMFSbT8TuaNl- > >>> > >>>> EpbTHQ > >>>> > >>>> Please have a look, comment and ask questions. > >>>> > >>>> Thank you, > >>>> Fabian > >>>> > >>>> [1] > >>>> https://lists.apache.org/thread.html/6397caf0ca37f97f2cd27d96f7a12c > >>>> 6fa845d6fd0870214fdce18d96@%3Cdev.calcite.apache.org%3E > >>>> > >>>> 2017-02-16 1:18 GMT+01:00 Fabian Hueske <fhue...@gmail.com>: > >>>> > >>>> Thanks everybody for the comments. > >>>>> > >>>>> Actually, I think we do not have much choice when deciding whether > >>>>> to > >>>>> > >>>> use > >>> > >>>> attributes or functions. > >>>>> Consider the following join query: > >>>>> > >>>>> SELECT t1.amount, t2.rate > >>>>> FROM > >>>>> table1 AS t1, > >>>>> table2 AS t2 > >>>>> WHERE > >>>>> t1.currency = t2.currency AND > >>>>> t2.rowtime = ( > >>>>> SELECT MAX(t22.rowtime) > >>>>> FROM table2 AS t22 > >>>>> AND t22.rowtime <= t1.rowtime) > >>>>> > >>>>> The query joins two streaming tables. Table 1 is a streaming table > >>>>> with amounts in a certain currency. Table 2 is a (slowly changing) > >>>>> streaming table of currency exchange rates. > >>>>> We want to join the amounts stream with the exchange rate of the > >>>>> corresponding currency that is valid (i.e., last received value -> > >>>>> MAX(rowtime)) at the rowtime of the amounts row. > >>>>> In order to specify the query, we need to refer to the rowtime of > >>>>> the different tables. Hence, we need a way to relate the rowtime > >>>>> expression > >>>>> > >>>> (or > >>>> > >>>>> marker) to a table. > >>>>> This is not possible with a parameterless scalar function. > >>>>> > >>>>> I'd like to comment on the concerns regarding the performance: > >>>>> In fact, the columns could be completely virtual and only exist > >>>>> during query parsing and validation. > >>>>> During execution, we can directly access the rowtime metadata of a > >>>>> > >>>> Flink > >>> > >>>> streaming record (which is present anyway) or look up the current > >>>>> processing time from the machine clock. So the processing overhead > >>>>> > >>>> would > >>> > >>>> actually be the same as with a marker function. > >>>>> > >>>>> Regarding the question on what should be allowed with a system > >>>>> > >>>> attribute: > >>> > >>>> IMO, it could be used as any other attribute. We need it at least > >>>> in > >>>>> > >>>> GROUP > >>>> > >>>>> BY, ORDER BY, and WHERE to define windows and joins. We could also > >>>>> > >>>> allow > >>> > >>>> to > >>>> > >>>>> access it in SELECT if we want users to give access to rowtime and > >>>>> processing time. So @Haohui, your query could be supported. > >>>>> However, what would not be allowed is to modify the value of the > >>>>> rows, i.e., by naming another column rowtime, i.e., "SELECT > >>>>> sometimestamp AS rowtime" would not be allowed, because Flink does > >>>>> not support to modify > >>>>> > >>>> the > >>>> > >>>>> event time of a row (for good reasons) and processing time should > >>>>> not > >>>>> > >>>> be > >>> > >>>> modifiable anyway. > >>>>> > >>>>> @Timo: > >>>>> I think the approach to only use the system columns during parsing > >>>>> and validation and converting them to expressions afterwards makes > >>>>> a lot of sense. > >>>>> The question is how this approach could be nicely integrated with > >>>>> > >>>> Calcite. > >>>> > >>>>> Best, Fabian > >>>>> > >>>>> > >>>>> > >>>>> 2017-02-15 16:50 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>: > >>>>> > >>>>> Hi, > >>>>>> > >>>>>> My initial thought would be that it makes more sense to thave > >>>>>> > >>>>> procTime() > >>> > >>>> and rowTime() only as functions which in fact are to be used as > >>>>>> > >>>>> markers. > >>> > >>>> Having the value (even from special system attributes does not make > >>>>>> > >>>>> sense > >>>> > >>>>> in some scenario such as the ones for creating windows, e.g., > >>>>>> If you have SELECT Count(*) OVER (ORDER BY procTime()...) If you > >>>>>> get the value of procTime you cannot do anything as you need > >>>>>> > >>>>> the > >>> > >>>> marker to know how to construct the window logic. > >>>>>> > >>>>>> However, your final idea of having " implement some rule/logic > >>>>>> that translates the attributes to special RexNodes internally " I > >>>>>> believe > >>>>>> > >>>>> is > >>> > >>>> good and gives a solution to both problems. One the one hand for > >>>> those > >>>>>> scenarios where you need the value you can access the value, > >>>>>> while for others you can see the special type of the RexNode and > >>>>>> use it as a > >>>>>> > >>>>> marker. > >>>> > >>>>> Regarding keeping this data in a table...i am not sure as you > >>>>> would > >>>>>> > >>>>> say > >>> > >>>> we need to augment the data with two fields whether needed or > >>>>>> > >>>>> not...this > >>>> > >>>>> is nto necessary very efficient > >>>>>> > >>>>>> > >>>>>> Dr. Radu Tudoran > >>>>>> Senior Research Engineer - Big Data Expert IT R&D Division > >>>>>> > >>>>>> > >>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH > >>>>>> European Research Center > >>>>>> Riesstrasse 25, 80992 München > >>>>>> > >>>>>> E-mail: radu.tudo...@huawei.com > >>>>>> Mobile: +49 15209084330 > >>>>>> Telephone: +49 891588344173 > >>>>>> > >>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH > >>>>>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com > >>>>>> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, > >>>>>> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN > >>>>>> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB > 56063, > >>>>>> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN > >>>>>> This e-mail and its attachments contain confidential information > from > >>>>>> HUAWEI, which is intended only for the person or entity whose > address > >>>>>> > >>>>> is > >>> > >>>> listed above. Any use of the information contained herein in any way > >>>>>> (including, but not limited to, total or partial disclosure, > >>>>>> > >>>>> reproduction, > >>>> > >>>>> or dissemination) by persons other than the intended recipient(s) is > >>>>>> prohibited. If you receive this e-mail in error, please notify the > >>>>>> > >>>>> sender > >>>> > >>>>> by phone or email immediately and delete it! > >>>>>> > >>>>>> -----Original Message----- > >>>>>> From: Timo Walther [mailto:twal...@apache.org] > >>>>>> Sent: Wednesday, February 15, 2017 9:33 AM > >>>>>> To: dev@flink.apache.org > >>>>>> Subject: Re: [DISCUSS] Table API / SQL indicators for event and > >>>>>> processing time > >>>>>> > >>>>>> Hi all, > >>>>>> > >>>>>> at first I also thought that built-in functions (rowtime() and > >>>>>> proctime()) are the easiest solution. However, I think to be > >>>>>> > >>>>> future-proof > >>>> > >>>>> we should make them system attributes; esp. to relate them to a > >>>>>> corresponding table in case of multiple tables. Logically they are > >>>>>> attributes of each row, which is already done in Table API. > >>>>>> > >>>>>> I will ask on the Calcite ML if there is a good way for integrating > >>>>>> system attributes. Right now, I would propose the following > >>>>>> > >>>>> implementation: > >>>> > >>>>> - we introduce a custom row type (extending RelDataType) > >>>>>> - in a streaming environment every row has two attributes by default > >>>>>> (rowtime and proctime) > >>>>>> - we do not allow creating a row type with those attributes (this > >>>>>> > >>>>> should > >>> > >>>> already prevent `SELECT field AS rowtime FROM ...`) > >>>>>> - we need to ensure that these attributes are not part of expansion > >>>>>> > >>>>> like > >>> > >>>> `SELECT * FROM ...` > >>>>>> - implement some rule/logic that translates the attributes to > special > >>>>>> RexNodes internally, such that the opimizer does not modify these > >>>>>> > >>>>> attributes > >>>> > >>>>> What do you think? > >>>>>> > >>>>>> Regards, > >>>>>> Timo > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> Am 15/02/17 um 03:36 schrieb Xingcan Cui: > >>>>>> > >>>>>>> Hi all, > >>>>>>> > >>>>>>> thanks for this thread. > >>>>>>> > >>>>>>> @Fabian If I didn't miss the point, the main difference between the > >>>>>>> two approaches is whether or not taking these time attributes as > >>>>>>> common table fields that are directly available to users. Whatever, > >>>>>>> these time attributes should be attached to records (right?), and > >>>>>>> > >>>>>> the > >>> > >>>> discussion lies in whether give them public qualifiers like other > >>>>>>> common fields or private qualifiers and related get/set methods. > >>>>>>> > >>>>>>> The former (system attributes) approach will be more compatible > with > >>>>>>> existing SQL read-only operations (e.g., select, join), but we need > >>>>>>> > >>>>>> to > >>> > >>>> add restrictions on SQL modification operation (like what?). I think > >>>>>>> there are no needs to forbid users modifying these attributes via > >>>>>>> table APIs (like map function). Just inform them about these > special > >>>>>>> attribute names like system built in aggregator names in iteration. > >>>>>>> > >>>>>>> As for the built in function approach, I don't know if, for now, > >>>>>>> > >>>>>> there > >>> > >>>> are functions applied on a single row (maybe the value access > >>>>>>> functions like COMPOSITE.get(STRING)?). It seems that most of the > >>>>>>> built in functions work for a single field or on columns and thus > it > >>>>>>> will be mountains of work if we want to add a new kind of function > >>>>>>> > >>>>>> to > >>> > >>>> SQL. Maybe all existing operations should be modified to support it. > >>>>>>> > >>>>>>> All in all, if there are existing supports for single row function, > >>>>>>> > >>>>>> I > >>> > >>>> prefer the built in function approach. Otherwise the system > >>>>>>> > >>>>>> attributes > >>> > >>>> approach should be better. After all there are not so much > >>>>>>> modification operations in SQL and maybe we can use alias to > support > >>>>>>> time attributes setting (just hypothesis, not sure if it's > >>>>>>> > >>>>>> feasible). > >>> > >>>> @Haohui I think the given query is valid if we add a aggregate > >>>>>>> function to (PROCTIME() > >>>>>>> - ROWTIME()) / 1000 and it should be executed efficiently. > >>>>>>> > >>>>>>> Best, > >>>>>>> Xingcan > >>>>>>> > >>>>>>> On Wed, Feb 15, 2017 at 6:17 AM, Haohui Mai <ricet...@gmail.com> > >>>>>>> > >>>>>> wrote: > >>>> > >>>>> Hi, > >>>>>>>> > >>>>>>>> Thanks for starting the discussion. I can see there are multiple > >>>>>>>> trade-offs in these two approaches. One question I have is that to > >>>>>>>> which extent Flink wants to open its APIs to allow users to access > >>>>>>>> both processing and event time. > >>>>>>>> > >>>>>>>> Before we talk about joins, my understanding for the two > approaches > >>>>>>>> that you mentioned are essentially (1) treating the value of event > >>>>>>>> > >>>>>>> / > >>> > >>>> processing time as first-class fields for each row, (2) limiting > >>>>>>>> > >>>>>>> the > >>> > >>>> scope of time indicators to only specifying windows. Take the > >>>>>>>> following query as an > >>>>>>>> example: > >>>>>>>> > >>>>>>>> SELECT (PROCTIME() - ROWTIME()) / 1000 AS latency FROM table GROUP > >>>>>>>> > >>>>>>> BY > >>> > >>>> FLOOR(PROCTIME() TO MINUTES) > >>>>>>>> > >>>>>>>> There are several questions we can ask: > >>>>>>>> > >>>>>>>> (1) Is it a valid query? > >>>>>>>> (2) How efficient the query will be? > >>>>>>>> > >>>>>>>> For this query I can see arguments from both sides. I think at the > >>>>>>>> end of the day it really comes down to what Flink wants to > support. > >>>>>>>> After working on FLINK-5624 I'm more inclined to support the > second > >>>>>>>> approach (i.e., built-in functions). The main reason why is that > >>>>>>>> > >>>>>>> the > >>> > >>>> APIs of Flink are designed to separate times from the real > >>>>>>>> > >>>>>>> payloads. > >>> > >>>> It probably makes sense for the Table / SQL APIs to have the same > >>>>>>>> > >>>>>>> designs. > >>>>>> > >>>>>>> For joins I don't have a clear answer on top of my head. Flink > >>>>>>>> requires two streams to be put in the same window before doing the > >>>>>>>> joins. This is essentially a subset of what SQL can express. I > >>>>>>>> > >>>>>>> don't > >>> > >>>> know what would be the best approach here. > >>>>>>>> > >>>>>>>> Regards, > >>>>>>>> Haohui > >>>>>>>> > >>>>>>>> > >>>>>>>> On Tue, Feb 14, 2017 at 12:26 AM Fabian Hueske <fhue...@gmail.com > > > >>>>>>>> > >>>>>>> wrote: > >>>>>> > >>>>>>> Hi, > >>>>>>>>> > >>>>>>>>> It would as in the query I gave as an example before: > >>>>>>>>> > >>>>>>>>> SELECT > >>>>>>>>> a, > >>>>>>>>> SUM(b) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2 > >>>>>>>>> PRECEDING AND CURRENT ROW) AS sumB, FROM myStream > >>>>>>>>> > >>>>>>>>> Here "proctime" would be a system attribute of the table > >>>>>>>>> > >>>>>>>> "myStream". > >>> > >>>> The table would also have another system attribute called > >>>>>>>>> > >>>>>>>> "rowtime" > >>> > >>>> which would be used to indicate event time semantics. > >>>>>>>>> These attributes would always be present in tables which are > >>>>>>>>> > >>>>>>>> derived > >>> > >>>> from streams. > >>>>>>>>> Because we still require that streams have timestamps and > >>>>>>>>> > >>>>>>>> watermarks > >>> > >>>> assigned (either by the StreamTableSource or the somewhere > >>>>>>>>> downstream the DataStream program) when they are converted into a > >>>>>>>>> table, there is no > >>>>>>>>> > >>>>>>>> need > >>>>>>>> > >>>>>>>>> to register anything. > >>>>>>>>> > >>>>>>>>> Does that answer your questions? > >>>>>>>>> > >>>>>>>>> Best, Fabian > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> 2017-02-14 2:04 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com > >: > >>>>>>>>> > >>>>>>>>> Hi Fabian, > >>>>>>>>>> > >>>>>>>>>> Thanks for starting the discussion. Before I give my thoughts on > >>>>>>>>>> this > >>>>>>>>>> > >>>>>>>>> can > >>>>>>>> > >>>>>>>>> you please give some examples of how would you see option of > >>>>>>>>>> > >>>>>>>>> using > >>> > >>>> "system > >>>>>>>>> > >>>>>>>>>> attributes"? > >>>>>>>>>> Do you use this when you register the stream as a table, do you > >>>>>>>>>> > >>>>>>>>> use > >>> > >>>> if when you call an SQL query, do you use it when you translate > >>>>>>>>>> back a > >>>>>>>>>> > >>>>>>>>> table > >>>>>>>> > >>>>>>>>> to a stream / write it to a dynamic table? > >>>>>>>>>> > >>>>>>>>>> Dr. Radu Tudoran > >>>>>>>>>> Senior Research Engineer - Big Data Expert IT R&D Division > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH > >>>>>>>>>> European Research Center > >>>>>>>>>> Riesstrasse 25, 80992 München > >>>>>>>>>> > >>>>>>>>>> E-mail: radu.tudo...@huawei.com > >>>>>>>>>> Mobile: +49 15209084330 <+49%201520%209084330> > >>>>>>>>>> Telephone: +49 891588344173 <+49%2089%201588344173> > >>>>>>>>>> > >>>>>>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH > >>>>>>>>>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com > >>>>>>>>>> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB > >>>>>>>>>> > >>>>>>>>> 56063, > >>>> > >>>>> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN > >>>>>>>>>> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB > >>>>>>>>>> > >>>>>>>>> 56063, > >>>> > >>>>> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN > >>>>>>>>>> This e-mail and its attachments contain confidential information > >>>>>>>>>> > >>>>>>>>> from > >>>> > >>>>> HUAWEI, which is intended only for the person or entity whose > >>>>>>>>>> > >>>>>>>>> address > >>>> > >>>>> is > >>>>>>>> > >>>>>>>>> listed above. Any use of the information contained herein in any > >>>>>>>>>> > >>>>>>>>> way > >>>> > >>>>> (including, but not limited to, total or partial disclosure, > >>>>>>>>>> > >>>>>>>>> reproduction, > >>>>>>>>> > >>>>>>>>>> or dissemination) by persons other than the intended > recipient(s) > >>>>>>>>>> > >>>>>>>>> is > >>>> > >>>>> prohibited. If you receive this e-mail in error, please notify > >>>>>>>>>> > >>>>>>>>> the > >>> > >>>> sender > >>>>>>>> > >>>>>>>>> by phone or email immediately and delete it! > >>>>>>>>>> > >>>>>>>>>> -----Original Message----- > >>>>>>>>>> From: Fabian Hueske [mailto:fhue...@gmail.com] > >>>>>>>>>> Sent: Tuesday, February 14, 2017 1:01 AM > >>>>>>>>>> To: dev@flink.apache.org > >>>>>>>>>> Subject: [DISCUSS] Table API / SQL indicators for event and > >>>>>>>>>> > >>>>>>>>> processing > >>>>>> > >>>>>>> time > >>>>>>>>> > >>>>>>>>>> Hi, > >>>>>>>>>> > >>>>>>>>>> I'd like to start an discussion about how Table API / SQL > queries > >>>>>>>>>> > >>>>>>>>> indicate > >>>>>>>>> > >>>>>>>>>> whether an operation is done in event or processing time. > >>>>>>>>>> > >>>>>>>>>> 1) Why do we need to indicate the time mode? > >>>>>>>>>> > >>>>>>>>>> We need to distinguish event time and processing time mode for > >>>>>>>>>> > >>>>>>>>> operations > >>>>>>>> > >>>>>>>>> in queries in order to have the semantics of a query fully > >>>>>>>>>> > >>>>>>>>> defined. > >>> > >>>> This cannot be globally done in the TableEnvironment because some > >>>>>>>>>> > >>>>>>>>> queries > >>>>>>>> > >>>>>>>>> explicitly request an expression such as the ORDER BY clause of > >>>>>>>>>> > >>>>>>>>> an > >>> > >>>> OVER > >>>>>> > >>>>>>> window with PRECEDING / FOLLOWING clauses. > >>>>>>>>>> So we need a way to specify something like the following query: > >>>>>>>>>> > >>>>>>>>>> SELECT > >>>>>>>>>> a, > >>>>>>>>>> SUM(b) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2 > >>>>>>>>>> > >>>>>>>>> PRECEDING > >>>>>>>> > >>>>>>>>> AND CURRENT ROW) AS sumB, FROM myStream > >>>>>>>>>> > >>>>>>>>>> where "proctime" indicates processing time. Equivalently > >>>>>>>>>> > >>>>>>>>> "rowtime" > >>> > >>>> would > >>>>>>>> > >>>>>>>>> indicate event time. > >>>>>>>>>> > >>>>>>>>>> 2) Current state > >>>>>>>>>> > >>>>>>>>>> The current master branch implements time support only for > >>>>>>>>>> > >>>>>>>>> grouping > >>> > >>>> windows in the Table API. > >>>>>>>>>> Internally, the Table API converts a 'rowtime symbol (which > looks > >>>>>>>>>> > >>>>>>>>> like > >>>>>> > >>>>>>> a > >>>>>>>> > >>>>>>>>> regular attribute) into a special expression which indicates > >>>>>>>>>> > >>>>>>>>> event-time. > >>>>>>>> > >>>>>>>>> For example: > >>>>>>>>>> > >>>>>>>>>> table > >>>>>>>>>> .window(Tumble over 5.milli on 'rowtime as 'w) > >>>>>>>>>> .groupBy('a, 'w) > >>>>>>>>>> .select(...) > >>>>>>>>>> > >>>>>>>>>> defines a tumbling event-time window. > >>>>>>>>>> > >>>>>>>>>> Processing-time is indicated by omitting a time attribute > >>>>>>>>>> (table.window(Tumble over 5.milli as 'w) ). > >>>>>>>>>> > >>>>>>>>>> 3) How can we do that in SQL? > >>>>>>>>>> > >>>>>>>>>> In SQL we cannot add special expressions without touching the > >>>>>>>>>> > >>>>>>>>> parser > >>>> > >>>>> which > >>>>>>>>> > >>>>>>>>>> we don't want to do because we want to stick to the SQL > standard. > >>>>>>>>>> Therefore, I see only two options: adding system attributes or > >>>>>>>>>> (parameterless) built-in functions. I list some pros and cons of > >>>>>>>>>> > >>>>>>>>> the > >>>> > >>>>> approaches below: > >>>>>>>>>> > >>>>>>>>>> 1. System Attributes: > >>>>>>>>>> + most natural way to access a property of a record. > >>>>>>>>>> + works with joins, because time attributes can be related to > >>>>>>>>>> > >>>>>>>>> tables > >>>> > >>>>> - We need to ensure the attributes are not writable and always > >>>>>>>>>> > >>>>>>>>> present > >>>>>> > >>>>>>> in > >>>>>>>> > >>>>>>>>> streaming tables (i.e., they should be system defined > >>>>>>>>>> > >>>>>>>>> attributes). > >>> > >>>> - Need to adapt existing Table API expressions (will not change > >>>>>>>>>> > >>>>>>>>> the > >>> > >>>> API > >>>>>> > >>>>>>> but some parts of the internal translation) > >>>>>>>>>> - Event time value must be set when the stream is converted, > >>>>>>>>>> > >>>>>>>>> processing > >>>>>> > >>>>>>> time is evaluated on the fly > >>>>>>>>>> > >>>>>>>>>> 2. Built-in Functions > >>>>>>>>>> + Users could try to modify time attributes which is not > possible > >>>>>>>>>> > >>>>>>>>> with > >>>>>> > >>>>>>> functions > >>>>>>>>>> - do not work with joins, because we need to address different > >>>>>>>>>> > >>>>>>>>> relations > >>>>>>>> > >>>>>>>>> - not a natural way to access a property of a record > >>>>>>>>>> > >>>>>>>>>> I think the only viable choice are system attributes, because > >>>>>>>>>> > >>>>>>>>> built-in > >>>>>> > >>>>>>> functions cannot be used for joins. > >>>>>>>>>> However, system attributes are the more complex solution because > >>>>>>>>>> > >>>>>>>>> they > >>>> > >>>>> need > >>>>>>>>> > >>>>>>>>>> a better integration with Calcite's SQL validator (preventing > >>>>>>>>>> > >>>>>>>>> user > >>> > >>>> attributes which are named rowtime for instance). > >>>>>>>>>> > >>>>>>>>>> Since there are currently a several contributions on the way > >>>>>>>>>> > >>>>>>>>> (such > >>> > >>>> as > >>>> > >>>>> SQL > >>>>>>>> > >>>>>>>>> OVER windows FLINK-5653 to FLINK-5658) that need time indicators, > >>>>>>>>>> > >>>>>>>>> we > >>>> > >>>>> need a > >>>>>>>>> > >>>>>>>>>> solution soon to be able to make progress. > >>>>>>>>>> There are two PRs, #3252 and #3271, which implement the built-in > >>>>>>>>>> > >>>>>>>>> marker > >>>>>> > >>>>>>> functions proctime() and rowtime() and which could serve as a > >>>>>>>>>> > >>>>>>>>> temporary > >>>>>> > >>>>>>> solution (since we do not work on joins yet). > >>>>>>>>>> I would like to suggest to use these functions as a starting > >>>>>>>>>> > >>>>>>>>> point > >>> > >>>> (once > >>>>>>>> > >>>>>>>>> the PRs are merged) and later change to the system attribute > >>>>>>>>>> > >>>>>>>>> solution > >>>> > >>>>> which > >>>>>>>>> > >>>>>>>>>> needs a bit more time to be implemented. > >>>>>>>>>> > >>>>>>>>>> I talked with Timo today about this issue and he said he would > >>>>>>>>>> > >>>>>>>>> like > >>> > >>>> to > >>>>>> > >>>>>>> investigate how we can implement this as system functions > >>>>>>>>>> > >>>>>>>>> properly > >>> > >>>> integrated with Calcite and the SQL Validator. > >>>>>>>>>> > >>>>>>>>>> What do others think? > >>>>>>>>>> > >>>>>>>>>> Best, Fabian > >>>>>>>>>> > >>>>>>>>>> > >>>>>> > > >