Hi all, I have a question about the designate time for `rowtime`. The current design do this during the DataStream to Table conversion. Does this mean that `rowtime` is only valid for the source streams and can not be designated after a subquery? (That's why I considered using alias to dynamically designate it in a SQL before)
Best, Xingcan On Wed, Mar 1, 2017 at 5:35 AM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi Jincheng Sun, > > registering watermark functions for different attributes to allow each of > them to be used in a window is an interesting idea. > > However, watermarks only work well if the streaming data is (almost) in > timestamp order. Since it is not possible to sort a stream, all attributes > that would qualify as event-time attributes need to be in almost the same > order. I think this limits the benefits of having multiple watermark > functions quite significantly. But maybe you have a good use case that you > can share where multiple event-time attributes would work well. > > So far our approach has been that a DataStream which is converted into a > Table has already timestamps and watermarks assigned. We also assumed that > a StreamTableSource would provide watermarks and timestamps and indicate > the name of the attribute that carries the timestamp. > > @Stefano: That's great news. I'd suggest to open a pull request and have a > look at PR #3397 which handles the (partitioned) unbounded case. Would be > good to share some code between these approaches. > > Thanks, Fabian > > 2017-02-28 18:17 GMT+01:00 Stefano Bortoli <stefano.bort...@huawei.com>: > > > Hi all, > > > > I have completed a first implementation that works for the SQL query > > SELECT a, SUM(b) OVER (PARTITION BY c ORDER BY a RANGE BETWEEN 2 > > PRECEDING) AS sumB FROM MyTable > > > > I have SUM, MAX, MIN, AVG, COUNT implemented but I could test it just on > > simple queries such as the one above. Is there any specific case I should > > be looking at? > > > > Regards, > > Stefano > > > > -----Original Message----- > > From: jincheng sun [mailto:sunjincheng...@gmail.com] > > Sent: Tuesday, February 28, 2017 12:26 PM > > To: dev@flink.apache.org > > Subject: Re: [DISCUSS] Table API / SQL indicators for event and > processing > > time > > > > Hi everyone, thanks for sharing your thoughts. I really like Timo’s > > proposal, and I have a few thoughts want to share. > > > > We want to keep the query same for batch and streaming. IMO. “process > time” > > is something special to dataStream while it is not a well defined term > for > > batch query. So it is kind of free to create something new for > processTime. > > I think it is a good idea to add a proctime as a reserved keyword for > SQL. > > > > Regarding to “event time”, it is well defined for batch query. So IMO, > we > > should keep the way of defining a streaming window exactly same as batch > > window. Therefore, the row for event time is nothing special, but just a > > normal column. The major difference between batch and stream is that in > > dataStream the event time column must be associated with a watermark > > function. I really like the way Timo proposed, that we can select any > > column as rowtime. But I think instead of just clarify a column is a > > rowtime (actually I do not think we need this special rowtime keyword), > it > > is better to register/associate the waterMark function to this column > when > > creating the table. For dataStream, we will validate a rowtime column > only > > if it has been associated with the waterMark function. A prototype code > to > > explain how it looks like is shown as below: > > > > TableAPI: > > toTable(tEnv, 'a, 'b, 'c) > > .registeredWatermarks('a, waterMarkFunction1) > > > > batchOrStreamTable > > .window(Tumble over 5.milli on 'a as 'w) > > .groupBy('w, 'b) > > .select('b, 'a.count as cnt1, 'c.sum as cnt2) > > > > SQL: > > addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c) > > .registeredWatermarks('a, waterMarkFunction1) > > > > SELECT a, SUM(b) OVER (PARTITION BY c ORDER BY a RANGE BETWEEN 2 > > PRECEDING) AS sumB FROM MyTable > > > > What do you think ? > > > > 2017-02-22 23:44 GMT+08:00 Timo Walther <twal...@apache.org>: > > > > > Hi everyone, > > > > > > I have create an issue [1] to track the progress of this topic. I have > > > written a little design document [2] how we could implement the > > > indicators and which parts have to be touched. I would suggest to > > > implement a prototype, also to see what is possible and can be > > > integrated both in Flink and Calcite. Feedback is welcome. > > > > > > Regards, > > > Timo > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-5884 > > > [2] https://docs.google.com/document/d/1JRXm09x_wKst6z6UXdCGF9tg > > > F1ueOAsFiQwahR72vbc/edit?usp=sharing > > > > > > > > > > > > Am 21/02/17 um 15:06 schrieb Fabian Hueske: > > > > > > Hi Xingcan, > > >> > > >> thanks for your thoughts. > > >> In principle you are right that the monotone attribute property would > > >> be sufficient, however there are more aspects to consider than that. > > >> > > >> Flink is a parallel stream processor engine which means that data is > > >> processed in separate processes and shuffle across them. > > >> Maintaining a strict order when merging parallel streams would be > > >> prohibitively expensive. > > >> Flink's watermark mechanism helps operators to deal with out-of-order > > >> data (due to out-of-order input or shuffles). > > >> I don't think we can separate the discussion about time attributes > > >> from watermarks if we want to use Flink as a processing engine and > > >> not reimplement large parts from scratch. > > >> > > >> When transforming a time attribute, we have to either align it with > > >> existing watermarks or generate new watermarks. > > >> If we want to allow all kinds of monotone transformations, we have to > > >> adapt the watermarks which is not trivial. > > >> Instead, I think we should initially only allow very few monotone > > >> transformations which are aligned with the existing watermarks. We > > >> might later relax this condition if we see that users request this > > feature. > > >> > > >> You are right, that we need to track which attribute can be used as a > > >> time attribute (i.e., is increasing and guarded by watermarks). > > >> For that we need to expose the time attribute when a Table is created > > >> (either when a DataStream is converted like: stream.toTable(tEnv, 'a, > > >> 'b, > > >> 't.rowtime) or in a StreamTableSource) and track how it is used in > > >> queries. > > >> I am not sure if the monotone property would be the right choice > > >> here, since data is only quasi-monotone and a monotone annotation > > >> might trigger some invalid optimizations which change the semantics of > > a query. > > >> Right now, Calcite does not offer a quasi-monotone property (at least > > >> I haven't found it). > > >> > > >> Best, Fabian > > >> > > >> > > >> 2017-02-21 4:41 GMT+01:00 Xingcan Cui <xingc...@gmail.com>: > > >> > > >> Hi all, > > >>> > > >>> As I said in another thread, the main difference between stream and > > >>> table is that a stream is an ordered list while a table is an > > unordered set. > > >>> > > >>> Without considering the out-of-order problem in practice, whether > > >>> event-time or processing-time can be just taken as a monotonically > > >>> increasing field and that's why the given query[1] would work. In > > >>> other words, we must guarantee the "SELECT MAX(t22.rowtime) ..." > > >>> subquery returns a single value that can be retrieved from the > > >>> cached dynamic table since it's dangerous to join two un-windowed > > >>> streams. > > >>> > > >>> Under this circumstance, I just consider adding a "monotonic > > >>> hint"(INC or > > >>> DEC) to the field of a (generalized) table (maybe using an > > >>> annotation on the registerDataXX method) that can be used to > > >>> indicate whether a field is monotonically increasing or decreasing. > > >>> Then by taking rowtime as common (monotonically increasing) field, > > >>> there are several benefits: > > >>> > > >>> 1) This can uniform the table and stream by importing total ordering > > >>> relation to an unordered set. > > >>> > > >>> 2) These fields can be modified arbitrarily as long as they keep the > > >>> declared monotonic feature and the watermark problem does not exist > > >>> any more. > > >>> > > >>> 3) The monotonic hint will be useful in the query optimization > process. > > >>> > > >>> What do you think? > > >>> > > >>> Best, > > >>> Xingcan > > >>> > > >>> [1] > > >>> SELECT t1.amount, t2.rate > > >>> FROM > > >>> table1 AS t1, > > >>> table2 AS t2 > > >>> WHERE > > >>> t1.currency = t2.currency AND > > >>> t2.rowtime = ( > > >>> SELECT MAX(t22.rowtime) > > >>> FROM table2 AS t22 > > >>> AND t22.rowtime <= t1.rowtime) > > >>> > > >>> On Tue, Feb 21, 2017 at 2:52 AM, Fabian Hueske <fhue...@gmail.com> > > >>> wrote: > > >>> > > >>> Hi everybody, > > >>>> > > >>>> When Timo wrote to the Calcite mailing list, Julian Hyde replied > > >>>> and gave good advice and explained why a system attribute for > > >>>> event-time would be > > >>>> > > >>> a > > >>> > > >>>> problem [1]. > > >>>> I thought about this and agree with Julian. > > >>>> > > >>>> Here is a document to describe the problem, constraints in Flink > > >>>> and a proposal how to handle processing time and event time in > > >>>> Table API and > > >>>> > > >>> SQL: > > >>> > > >>>> -> > > >>>> https://docs.google.com/document/d/1MDGViWA_ > > >>>> > > >>> TCqpaVoWub7u_GY4PMFSbT8TuaNl- > > >>> > > >>>> EpbTHQ > > >>>> > > >>>> Please have a look, comment and ask questions. > > >>>> > > >>>> Thank you, > > >>>> Fabian > > >>>> > > >>>> [1] > > >>>> https://lists.apache.org/thread.html/6397caf0ca37f97f2cd27d96f7a12c > > >>>> 6fa845d6fd0870214fdce18d96@%3Cdev.calcite.apache.org%3E > > >>>> > > >>>> 2017-02-16 1:18 GMT+01:00 Fabian Hueske <fhue...@gmail.com>: > > >>>> > > >>>> Thanks everybody for the comments. > > >>>>> > > >>>>> Actually, I think we do not have much choice when deciding whether > > >>>>> to > > >>>>> > > >>>> use > > >>> > > >>>> attributes or functions. > > >>>>> Consider the following join query: > > >>>>> > > >>>>> SELECT t1.amount, t2.rate > > >>>>> FROM > > >>>>> table1 AS t1, > > >>>>> table2 AS t2 > > >>>>> WHERE > > >>>>> t1.currency = t2.currency AND > > >>>>> t2.rowtime = ( > > >>>>> SELECT MAX(t22.rowtime) > > >>>>> FROM table2 AS t22 > > >>>>> AND t22.rowtime <= t1.rowtime) > > >>>>> > > >>>>> The query joins two streaming tables. Table 1 is a streaming table > > >>>>> with amounts in a certain currency. Table 2 is a (slowly changing) > > >>>>> streaming table of currency exchange rates. > > >>>>> We want to join the amounts stream with the exchange rate of the > > >>>>> corresponding currency that is valid (i.e., last received value -> > > >>>>> MAX(rowtime)) at the rowtime of the amounts row. > > >>>>> In order to specify the query, we need to refer to the rowtime of > > >>>>> the different tables. Hence, we need a way to relate the rowtime > > >>>>> expression > > >>>>> > > >>>> (or > > >>>> > > >>>>> marker) to a table. > > >>>>> This is not possible with a parameterless scalar function. > > >>>>> > > >>>>> I'd like to comment on the concerns regarding the performance: > > >>>>> In fact, the columns could be completely virtual and only exist > > >>>>> during query parsing and validation. > > >>>>> During execution, we can directly access the rowtime metadata of a > > >>>>> > > >>>> Flink > > >>> > > >>>> streaming record (which is present anyway) or look up the current > > >>>>> processing time from the machine clock. So the processing overhead > > >>>>> > > >>>> would > > >>> > > >>>> actually be the same as with a marker function. > > >>>>> > > >>>>> Regarding the question on what should be allowed with a system > > >>>>> > > >>>> attribute: > > >>> > > >>>> IMO, it could be used as any other attribute. We need it at least > > >>>> in > > >>>>> > > >>>> GROUP > > >>>> > > >>>>> BY, ORDER BY, and WHERE to define windows and joins. We could also > > >>>>> > > >>>> allow > > >>> > > >>>> to > > >>>> > > >>>>> access it in SELECT if we want users to give access to rowtime and > > >>>>> processing time. So @Haohui, your query could be supported. > > >>>>> However, what would not be allowed is to modify the value of the > > >>>>> rows, i.e., by naming another column rowtime, i.e., "SELECT > > >>>>> sometimestamp AS rowtime" would not be allowed, because Flink does > > >>>>> not support to modify > > >>>>> > > >>>> the > > >>>> > > >>>>> event time of a row (for good reasons) and processing time should > > >>>>> not > > >>>>> > > >>>> be > > >>> > > >>>> modifiable anyway. > > >>>>> > > >>>>> @Timo: > > >>>>> I think the approach to only use the system columns during parsing > > >>>>> and validation and converting them to expressions afterwards makes > > >>>>> a lot of sense. > > >>>>> The question is how this approach could be nicely integrated with > > >>>>> > > >>>> Calcite. > > >>>> > > >>>>> Best, Fabian > > >>>>> > > >>>>> > > >>>>> > > >>>>> 2017-02-15 16:50 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>: > > >>>>> > > >>>>> Hi, > > >>>>>> > > >>>>>> My initial thought would be that it makes more sense to thave > > >>>>>> > > >>>>> procTime() > > >>> > > >>>> and rowTime() only as functions which in fact are to be used as > > >>>>>> > > >>>>> markers. > > >>> > > >>>> Having the value (even from special system attributes does not make > > >>>>>> > > >>>>> sense > > >>>> > > >>>>> in some scenario such as the ones for creating windows, e.g., > > >>>>>> If you have SELECT Count(*) OVER (ORDER BY procTime()...) If you > > >>>>>> get the value of procTime you cannot do anything as you need > > >>>>>> > > >>>>> the > > >>> > > >>>> marker to know how to construct the window logic. > > >>>>>> > > >>>>>> However, your final idea of having " implement some rule/logic > > >>>>>> that translates the attributes to special RexNodes internally " I > > >>>>>> believe > > >>>>>> > > >>>>> is > > >>> > > >>>> good and gives a solution to both problems. One the one hand for > > >>>> those > > >>>>>> scenarios where you need the value you can access the value, > > >>>>>> while for others you can see the special type of the RexNode and > > >>>>>> use it as a > > >>>>>> > > >>>>> marker. > > >>>> > > >>>>> Regarding keeping this data in a table...i am not sure as you > > >>>>> would > > >>>>>> > > >>>>> say > > >>> > > >>>> we need to augment the data with two fields whether needed or > > >>>>>> > > >>>>> not...this > > >>>> > > >>>>> is nto necessary very efficient > > >>>>>> > > >>>>>> > > >>>>>> Dr. Radu Tudoran > > >>>>>> Senior Research Engineer - Big Data Expert IT R&D Division > > >>>>>> > > >>>>>> > > >>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH > > >>>>>> European Research Center > > >>>>>> Riesstrasse 25, 80992 München > > >>>>>> > > >>>>>> E-mail: radu.tudo...@huawei.com > > >>>>>> Mobile: +49 15209084330 > > >>>>>> Telephone: +49 891588344173 > > >>>>>> > > >>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH > > >>>>>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com > > >>>>>> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB > 56063, > > >>>>>> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN > > >>>>>> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB > > 56063, > > >>>>>> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN > > >>>>>> This e-mail and its attachments contain confidential information > > from > > >>>>>> HUAWEI, which is intended only for the person or entity whose > > address > > >>>>>> > > >>>>> is > > >>> > > >>>> listed above. Any use of the information contained herein in any way > > >>>>>> (including, but not limited to, total or partial disclosure, > > >>>>>> > > >>>>> reproduction, > > >>>> > > >>>>> or dissemination) by persons other than the intended recipient(s) > is > > >>>>>> prohibited. If you receive this e-mail in error, please notify the > > >>>>>> > > >>>>> sender > > >>>> > > >>>>> by phone or email immediately and delete it! > > >>>>>> > > >>>>>> -----Original Message----- > > >>>>>> From: Timo Walther [mailto:twal...@apache.org] > > >>>>>> Sent: Wednesday, February 15, 2017 9:33 AM > > >>>>>> To: dev@flink.apache.org > > >>>>>> Subject: Re: [DISCUSS] Table API / SQL indicators for event and > > >>>>>> processing time > > >>>>>> > > >>>>>> Hi all, > > >>>>>> > > >>>>>> at first I also thought that built-in functions (rowtime() and > > >>>>>> proctime()) are the easiest solution. However, I think to be > > >>>>>> > > >>>>> future-proof > > >>>> > > >>>>> we should make them system attributes; esp. to relate them to a > > >>>>>> corresponding table in case of multiple tables. Logically they are > > >>>>>> attributes of each row, which is already done in Table API. > > >>>>>> > > >>>>>> I will ask on the Calcite ML if there is a good way for > integrating > > >>>>>> system attributes. Right now, I would propose the following > > >>>>>> > > >>>>> implementation: > > >>>> > > >>>>> - we introduce a custom row type (extending RelDataType) > > >>>>>> - in a streaming environment every row has two attributes by > default > > >>>>>> (rowtime and proctime) > > >>>>>> - we do not allow creating a row type with those attributes (this > > >>>>>> > > >>>>> should > > >>> > > >>>> already prevent `SELECT field AS rowtime FROM ...`) > > >>>>>> - we need to ensure that these attributes are not part of > expansion > > >>>>>> > > >>>>> like > > >>> > > >>>> `SELECT * FROM ...` > > >>>>>> - implement some rule/logic that translates the attributes to > > special > > >>>>>> RexNodes internally, such that the opimizer does not modify these > > >>>>>> > > >>>>> attributes > > >>>> > > >>>>> What do you think? > > >>>>>> > > >>>>>> Regards, > > >>>>>> Timo > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> Am 15/02/17 um 03:36 schrieb Xingcan Cui: > > >>>>>> > > >>>>>>> Hi all, > > >>>>>>> > > >>>>>>> thanks for this thread. > > >>>>>>> > > >>>>>>> @Fabian If I didn't miss the point, the main difference between > the > > >>>>>>> two approaches is whether or not taking these time attributes as > > >>>>>>> common table fields that are directly available to users. > Whatever, > > >>>>>>> these time attributes should be attached to records (right?), and > > >>>>>>> > > >>>>>> the > > >>> > > >>>> discussion lies in whether give them public qualifiers like other > > >>>>>>> common fields or private qualifiers and related get/set methods. > > >>>>>>> > > >>>>>>> The former (system attributes) approach will be more compatible > > with > > >>>>>>> existing SQL read-only operations (e.g., select, join), but we > need > > >>>>>>> > > >>>>>> to > > >>> > > >>>> add restrictions on SQL modification operation (like what?). I think > > >>>>>>> there are no needs to forbid users modifying these attributes via > > >>>>>>> table APIs (like map function). Just inform them about these > > special > > >>>>>>> attribute names like system built in aggregator names in > iteration. > > >>>>>>> > > >>>>>>> As for the built in function approach, I don't know if, for now, > > >>>>>>> > > >>>>>> there > > >>> > > >>>> are functions applied on a single row (maybe the value access > > >>>>>>> functions like COMPOSITE.get(STRING)?). It seems that most of the > > >>>>>>> built in functions work for a single field or on columns and thus > > it > > >>>>>>> will be mountains of work if we want to add a new kind of > function > > >>>>>>> > > >>>>>> to > > >>> > > >>>> SQL. Maybe all existing operations should be modified to support it. > > >>>>>>> > > >>>>>>> All in all, if there are existing supports for single row > function, > > >>>>>>> > > >>>>>> I > > >>> > > >>>> prefer the built in function approach. Otherwise the system > > >>>>>>> > > >>>>>> attributes > > >>> > > >>>> approach should be better. After all there are not so much > > >>>>>>> modification operations in SQL and maybe we can use alias to > > support > > >>>>>>> time attributes setting (just hypothesis, not sure if it's > > >>>>>>> > > >>>>>> feasible). > > >>> > > >>>> @Haohui I think the given query is valid if we add a aggregate > > >>>>>>> function to (PROCTIME() > > >>>>>>> - ROWTIME()) / 1000 and it should be executed efficiently. > > >>>>>>> > > >>>>>>> Best, > > >>>>>>> Xingcan > > >>>>>>> > > >>>>>>> On Wed, Feb 15, 2017 at 6:17 AM, Haohui Mai <ricet...@gmail.com> > > >>>>>>> > > >>>>>> wrote: > > >>>> > > >>>>> Hi, > > >>>>>>>> > > >>>>>>>> Thanks for starting the discussion. I can see there are multiple > > >>>>>>>> trade-offs in these two approaches. One question I have is that > to > > >>>>>>>> which extent Flink wants to open its APIs to allow users to > access > > >>>>>>>> both processing and event time. > > >>>>>>>> > > >>>>>>>> Before we talk about joins, my understanding for the two > > approaches > > >>>>>>>> that you mentioned are essentially (1) treating the value of > event > > >>>>>>>> > > >>>>>>> / > > >>> > > >>>> processing time as first-class fields for each row, (2) limiting > > >>>>>>>> > > >>>>>>> the > > >>> > > >>>> scope of time indicators to only specifying windows. Take the > > >>>>>>>> following query as an > > >>>>>>>> example: > > >>>>>>>> > > >>>>>>>> SELECT (PROCTIME() - ROWTIME()) / 1000 AS latency FROM table > GROUP > > >>>>>>>> > > >>>>>>> BY > > >>> > > >>>> FLOOR(PROCTIME() TO MINUTES) > > >>>>>>>> > > >>>>>>>> There are several questions we can ask: > > >>>>>>>> > > >>>>>>>> (1) Is it a valid query? > > >>>>>>>> (2) How efficient the query will be? > > >>>>>>>> > > >>>>>>>> For this query I can see arguments from both sides. I think at > the > > >>>>>>>> end of the day it really comes down to what Flink wants to > > support. > > >>>>>>>> After working on FLINK-5624 I'm more inclined to support the > > second > > >>>>>>>> approach (i.e., built-in functions). The main reason why is that > > >>>>>>>> > > >>>>>>> the > > >>> > > >>>> APIs of Flink are designed to separate times from the real > > >>>>>>>> > > >>>>>>> payloads. > > >>> > > >>>> It probably makes sense for the Table / SQL APIs to have the same > > >>>>>>>> > > >>>>>>> designs. > > >>>>>> > > >>>>>>> For joins I don't have a clear answer on top of my head. Flink > > >>>>>>>> requires two streams to be put in the same window before doing > the > > >>>>>>>> joins. This is essentially a subset of what SQL can express. I > > >>>>>>>> > > >>>>>>> don't > > >>> > > >>>> know what would be the best approach here. > > >>>>>>>> > > >>>>>>>> Regards, > > >>>>>>>> Haohui > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> On Tue, Feb 14, 2017 at 12:26 AM Fabian Hueske < > fhue...@gmail.com > > > > > >>>>>>>> > > >>>>>>> wrote: > > >>>>>> > > >>>>>>> Hi, > > >>>>>>>>> > > >>>>>>>>> It would as in the query I gave as an example before: > > >>>>>>>>> > > >>>>>>>>> SELECT > > >>>>>>>>> a, > > >>>>>>>>> SUM(b) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN > 2 > > >>>>>>>>> PRECEDING AND CURRENT ROW) AS sumB, FROM myStream > > >>>>>>>>> > > >>>>>>>>> Here "proctime" would be a system attribute of the table > > >>>>>>>>> > > >>>>>>>> "myStream". > > >>> > > >>>> The table would also have another system attribute called > > >>>>>>>>> > > >>>>>>>> "rowtime" > > >>> > > >>>> which would be used to indicate event time semantics. > > >>>>>>>>> These attributes would always be present in tables which are > > >>>>>>>>> > > >>>>>>>> derived > > >>> > > >>>> from streams. > > >>>>>>>>> Because we still require that streams have timestamps and > > >>>>>>>>> > > >>>>>>>> watermarks > > >>> > > >>>> assigned (either by the StreamTableSource or the somewhere > > >>>>>>>>> downstream the DataStream program) when they are converted > into a > > >>>>>>>>> table, there is no > > >>>>>>>>> > > >>>>>>>> need > > >>>>>>>> > > >>>>>>>>> to register anything. > > >>>>>>>>> > > >>>>>>>>> Does that answer your questions? > > >>>>>>>>> > > >>>>>>>>> Best, Fabian > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> 2017-02-14 2:04 GMT+01:00 Radu Tudoran < > radu.tudo...@huawei.com > > >: > > >>>>>>>>> > > >>>>>>>>> Hi Fabian, > > >>>>>>>>>> > > >>>>>>>>>> Thanks for starting the discussion. Before I give my thoughts > on > > >>>>>>>>>> this > > >>>>>>>>>> > > >>>>>>>>> can > > >>>>>>>> > > >>>>>>>>> you please give some examples of how would you see option of > > >>>>>>>>>> > > >>>>>>>>> using > > >>> > > >>>> "system > > >>>>>>>>> > > >>>>>>>>>> attributes"? > > >>>>>>>>>> Do you use this when you register the stream as a table, do > you > > >>>>>>>>>> > > >>>>>>>>> use > > >>> > > >>>> if when you call an SQL query, do you use it when you translate > > >>>>>>>>>> back a > > >>>>>>>>>> > > >>>>>>>>> table > > >>>>>>>> > > >>>>>>>>> to a stream / write it to a dynamic table? > > >>>>>>>>>> > > >>>>>>>>>> Dr. Radu Tudoran > > >>>>>>>>>> Senior Research Engineer - Big Data Expert IT R&D Division > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH > > >>>>>>>>>> European Research Center > > >>>>>>>>>> Riesstrasse 25, 80992 München > > >>>>>>>>>> > > >>>>>>>>>> E-mail: radu.tudo...@huawei.com > > >>>>>>>>>> Mobile: +49 15209084330 <+49%201520%209084330> > > >>>>>>>>>> Telephone: +49 891588344173 <+49%2089%201588344173> > > >>>>>>>>>> > > >>>>>>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH > > >>>>>>>>>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com > > >>>>>>>>>> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB > > >>>>>>>>>> > > >>>>>>>>> 56063, > > >>>> > > >>>>> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN > > >>>>>>>>>> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB > > >>>>>>>>>> > > >>>>>>>>> 56063, > > >>>> > > >>>>> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN > > >>>>>>>>>> This e-mail and its attachments contain confidential > information > > >>>>>>>>>> > > >>>>>>>>> from > > >>>> > > >>>>> HUAWEI, which is intended only for the person or entity whose > > >>>>>>>>>> > > >>>>>>>>> address > > >>>> > > >>>>> is > > >>>>>>>> > > >>>>>>>>> listed above. Any use of the information contained herein in > any > > >>>>>>>>>> > > >>>>>>>>> way > > >>>> > > >>>>> (including, but not limited to, total or partial disclosure, > > >>>>>>>>>> > > >>>>>>>>> reproduction, > > >>>>>>>>> > > >>>>>>>>>> or dissemination) by persons other than the intended > > recipient(s) > > >>>>>>>>>> > > >>>>>>>>> is > > >>>> > > >>>>> prohibited. If you receive this e-mail in error, please notify > > >>>>>>>>>> > > >>>>>>>>> the > > >>> > > >>>> sender > > >>>>>>>> > > >>>>>>>>> by phone or email immediately and delete it! > > >>>>>>>>>> > > >>>>>>>>>> -----Original Message----- > > >>>>>>>>>> From: Fabian Hueske [mailto:fhue...@gmail.com] > > >>>>>>>>>> Sent: Tuesday, February 14, 2017 1:01 AM > > >>>>>>>>>> To: dev@flink.apache.org > > >>>>>>>>>> Subject: [DISCUSS] Table API / SQL indicators for event and > > >>>>>>>>>> > > >>>>>>>>> processing > > >>>>>> > > >>>>>>> time > > >>>>>>>>> > > >>>>>>>>>> Hi, > > >>>>>>>>>> > > >>>>>>>>>> I'd like to start an discussion about how Table API / SQL > > queries > > >>>>>>>>>> > > >>>>>>>>> indicate > > >>>>>>>>> > > >>>>>>>>>> whether an operation is done in event or processing time. > > >>>>>>>>>> > > >>>>>>>>>> 1) Why do we need to indicate the time mode? > > >>>>>>>>>> > > >>>>>>>>>> We need to distinguish event time and processing time mode for > > >>>>>>>>>> > > >>>>>>>>> operations > > >>>>>>>> > > >>>>>>>>> in queries in order to have the semantics of a query fully > > >>>>>>>>>> > > >>>>>>>>> defined. > > >>> > > >>>> This cannot be globally done in the TableEnvironment because some > > >>>>>>>>>> > > >>>>>>>>> queries > > >>>>>>>> > > >>>>>>>>> explicitly request an expression such as the ORDER BY clause of > > >>>>>>>>>> > > >>>>>>>>> an > > >>> > > >>>> OVER > > >>>>>> > > >>>>>>> window with PRECEDING / FOLLOWING clauses. > > >>>>>>>>>> So we need a way to specify something like the following > query: > > >>>>>>>>>> > > >>>>>>>>>> SELECT > > >>>>>>>>>> a, > > >>>>>>>>>> SUM(b) OVER (PARTITION BY c ORDER BY proctime ROWS > BETWEEN 2 > > >>>>>>>>>> > > >>>>>>>>> PRECEDING > > >>>>>>>> > > >>>>>>>>> AND CURRENT ROW) AS sumB, FROM myStream > > >>>>>>>>>> > > >>>>>>>>>> where "proctime" indicates processing time. Equivalently > > >>>>>>>>>> > > >>>>>>>>> "rowtime" > > >>> > > >>>> would > > >>>>>>>> > > >>>>>>>>> indicate event time. > > >>>>>>>>>> > > >>>>>>>>>> 2) Current state > > >>>>>>>>>> > > >>>>>>>>>> The current master branch implements time support only for > > >>>>>>>>>> > > >>>>>>>>> grouping > > >>> > > >>>> windows in the Table API. > > >>>>>>>>>> Internally, the Table API converts a 'rowtime symbol (which > > looks > > >>>>>>>>>> > > >>>>>>>>> like > > >>>>>> > > >>>>>>> a > > >>>>>>>> > > >>>>>>>>> regular attribute) into a special expression which indicates > > >>>>>>>>>> > > >>>>>>>>> event-time. > > >>>>>>>> > > >>>>>>>>> For example: > > >>>>>>>>>> > > >>>>>>>>>> table > > >>>>>>>>>> .window(Tumble over 5.milli on 'rowtime as 'w) > > >>>>>>>>>> .groupBy('a, 'w) > > >>>>>>>>>> .select(...) > > >>>>>>>>>> > > >>>>>>>>>> defines a tumbling event-time window. > > >>>>>>>>>> > > >>>>>>>>>> Processing-time is indicated by omitting a time attribute > > >>>>>>>>>> (table.window(Tumble over 5.milli as 'w) ). > > >>>>>>>>>> > > >>>>>>>>>> 3) How can we do that in SQL? > > >>>>>>>>>> > > >>>>>>>>>> In SQL we cannot add special expressions without touching the > > >>>>>>>>>> > > >>>>>>>>> parser > > >>>> > > >>>>> which > > >>>>>>>>> > > >>>>>>>>>> we don't want to do because we want to stick to the SQL > > standard. > > >>>>>>>>>> Therefore, I see only two options: adding system attributes or > > >>>>>>>>>> (parameterless) built-in functions. I list some pros and cons > of > > >>>>>>>>>> > > >>>>>>>>> the > > >>>> > > >>>>> approaches below: > > >>>>>>>>>> > > >>>>>>>>>> 1. System Attributes: > > >>>>>>>>>> + most natural way to access a property of a record. > > >>>>>>>>>> + works with joins, because time attributes can be related to > > >>>>>>>>>> > > >>>>>>>>> tables > > >>>> > > >>>>> - We need to ensure the attributes are not writable and always > > >>>>>>>>>> > > >>>>>>>>> present > > >>>>>> > > >>>>>>> in > > >>>>>>>> > > >>>>>>>>> streaming tables (i.e., they should be system defined > > >>>>>>>>>> > > >>>>>>>>> attributes). > > >>> > > >>>> - Need to adapt existing Table API expressions (will not change > > >>>>>>>>>> > > >>>>>>>>> the > > >>> > > >>>> API > > >>>>>> > > >>>>>>> but some parts of the internal translation) > > >>>>>>>>>> - Event time value must be set when the stream is converted, > > >>>>>>>>>> > > >>>>>>>>> processing > > >>>>>> > > >>>>>>> time is evaluated on the fly > > >>>>>>>>>> > > >>>>>>>>>> 2. Built-in Functions > > >>>>>>>>>> + Users could try to modify time attributes which is not > > possible > > >>>>>>>>>> > > >>>>>>>>> with > > >>>>>> > > >>>>>>> functions > > >>>>>>>>>> - do not work with joins, because we need to address different > > >>>>>>>>>> > > >>>>>>>>> relations > > >>>>>>>> > > >>>>>>>>> - not a natural way to access a property of a record > > >>>>>>>>>> > > >>>>>>>>>> I think the only viable choice are system attributes, because > > >>>>>>>>>> > > >>>>>>>>> built-in > > >>>>>> > > >>>>>>> functions cannot be used for joins. > > >>>>>>>>>> However, system attributes are the more complex solution > because > > >>>>>>>>>> > > >>>>>>>>> they > > >>>> > > >>>>> need > > >>>>>>>>> > > >>>>>>>>>> a better integration with Calcite's SQL validator (preventing > > >>>>>>>>>> > > >>>>>>>>> user > > >>> > > >>>> attributes which are named rowtime for instance). > > >>>>>>>>>> > > >>>>>>>>>> Since there are currently a several contributions on the way > > >>>>>>>>>> > > >>>>>>>>> (such > > >>> > > >>>> as > > >>>> > > >>>>> SQL > > >>>>>>>> > > >>>>>>>>> OVER windows FLINK-5653 to FLINK-5658) that need time > indicators, > > >>>>>>>>>> > > >>>>>>>>> we > > >>>> > > >>>>> need a > > >>>>>>>>> > > >>>>>>>>>> solution soon to be able to make progress. > > >>>>>>>>>> There are two PRs, #3252 and #3271, which implement the > built-in > > >>>>>>>>>> > > >>>>>>>>> marker > > >>>>>> > > >>>>>>> functions proctime() and rowtime() and which could serve as a > > >>>>>>>>>> > > >>>>>>>>> temporary > > >>>>>> > > >>>>>>> solution (since we do not work on joins yet). > > >>>>>>>>>> I would like to suggest to use these functions as a starting > > >>>>>>>>>> > > >>>>>>>>> point > > >>> > > >>>> (once > > >>>>>>>> > > >>>>>>>>> the PRs are merged) and later change to the system attribute > > >>>>>>>>>> > > >>>>>>>>> solution > > >>>> > > >>>>> which > > >>>>>>>>> > > >>>>>>>>>> needs a bit more time to be implemented. > > >>>>>>>>>> > > >>>>>>>>>> I talked with Timo today about this issue and he said he would > > >>>>>>>>>> > > >>>>>>>>> like > > >>> > > >>>> to > > >>>>>> > > >>>>>>> investigate how we can implement this as system functions > > >>>>>>>>>> > > >>>>>>>>> properly > > >>> > > >>>> integrated with Calcite and the SQL Validator. > > >>>>>>>>>> > > >>>>>>>>>> What do others think? > > >>>>>>>>>> > > >>>>>>>>>> Best, Fabian > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>> > > > > > >