Hi, Thanks for starting the discussion. I can see there are multiple trade-offs in these two approaches. One question I have is that to which extent Flink wants to open its APIs to allow users to access both processing and event time.
Before we talk about joins, my understanding for the two approaches that you mentioned are essentially (1) treating the value of event / processing time as first-class fields for each row, (2) limiting the scope of time indicators to only specifying windows. Take the following query as an example: SELECT (PROCTIME() - ROWTIME()) / 1000 AS latency FROM table GROUP BY FLOOR(PROCTIME() TO MINUTES) There are several questions we can ask: (1) Is it a valid query? (2) How efficient the query will be? For this query I can see arguments from both sides. I think at the end of the day it really comes down to what Flink wants to support. After working on FLINK-5624 I'm more inclined to support the second approach (i.e., built-in functions). The main reason why is that the APIs of Flink are designed to separate times from the real payloads. It probably makes sense for the Table / SQL APIs to have the same designs. For joins I don't have a clear answer on top of my head. Flink requires two streams to be put in the same window before doing the joins. This is essentially a subset of what SQL can express. I don't know what would be the best approach here. Regards, Haohui On Tue, Feb 14, 2017 at 12:26 AM Fabian Hueske <fhue...@gmail.com> wrote: > Hi, > > It would as in the query I gave as an example before: > > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2 > PRECEDING AND CURRENT ROW) AS sumB, > FROM myStream > > Here "proctime" would be a system attribute of the table "myStream". > The table would also have another system attribute called "rowtime" which > would be used to indicate event time semantics. > These attributes would always be present in tables which are derived from > streams. > Because we still require that streams have timestamps and watermarks > assigned (either by the StreamTableSource or the somewhere downstream the > DataStream program) when they are converted into a table, there is no need > to register anything. > > Does that answer your questions? > > Best, Fabian > > > > 2017-02-14 2:04 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>: > > > Hi Fabian, > > > > Thanks for starting the discussion. Before I give my thoughts on this can > > you please give some examples of how would you see option of using > "system > > attributes"? > > Do you use this when you register the stream as a table, do you use if > > when you call an SQL query, do you use it when you translate back a table > > to a stream / write it to a dynamic table? > > > > Dr. Radu Tudoran > > Senior Research Engineer - Big Data Expert > > IT R&D Division > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > > European Research Center > > Riesstrasse 25, 80992 München > > > > E-mail: radu.tudo...@huawei.com > > Mobile: +49 15209084330 <+49%201520%209084330> > > Telephone: +49 891588344173 <+49%2089%201588344173> > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com > > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, > > Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, > > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN > > This e-mail and its attachments contain confidential information from > > HUAWEI, which is intended only for the person or entity whose address is > > listed above. Any use of the information contained herein in any way > > (including, but not limited to, total or partial disclosure, > reproduction, > > or dissemination) by persons other than the intended recipient(s) is > > prohibited. If you receive this e-mail in error, please notify the sender > > by phone or email immediately and delete it! > > > > -----Original Message----- > > From: Fabian Hueske [mailto:fhue...@gmail.com] > > Sent: Tuesday, February 14, 2017 1:01 AM > > To: dev@flink.apache.org > > Subject: [DISCUSS] Table API / SQL indicators for event and processing > time > > > > Hi, > > > > I'd like to start an discussion about how Table API / SQL queries > indicate > > whether an operation is done in event or processing time. > > > > 1) Why do we need to indicate the time mode? > > > > We need to distinguish event time and processing time mode for operations > > in queries in order to have the semantics of a query fully defined. > > This cannot be globally done in the TableEnvironment because some queries > > explicitly request an expression such as the ORDER BY clause of an OVER > > window with PRECEDING / FOLLOWING clauses. > > So we need a way to specify something like the following query: > > > > SELECT > > a, > > SUM(b) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2 PRECEDING > > AND CURRENT ROW) AS sumB, FROM myStream > > > > where "proctime" indicates processing time. Equivalently "rowtime" would > > indicate event time. > > > > 2) Current state > > > > The current master branch implements time support only for grouping > > windows in the Table API. > > Internally, the Table API converts a 'rowtime symbol (which looks like a > > regular attribute) into a special expression which indicates event-time. > > For example: > > > > table > > .window(Tumble over 5.milli on 'rowtime as 'w) > > .groupBy('a, 'w) > > .select(...) > > > > defines a tumbling event-time window. > > > > Processing-time is indicated by omitting a time attribute > > (table.window(Tumble over 5.milli as 'w) ). > > > > 3) How can we do that in SQL? > > > > In SQL we cannot add special expressions without touching the parser > which > > we don't want to do because we want to stick to the SQL standard. > > Therefore, I see only two options: adding system attributes or > > (parameterless) built-in functions. I list some pros and cons of the > > approaches below: > > > > 1. System Attributes: > > + most natural way to access a property of a record. > > + works with joins, because time attributes can be related to tables > > - We need to ensure the attributes are not writable and always present in > > streaming tables (i.e., they should be system defined attributes). > > - Need to adapt existing Table API expressions (will not change the API > > but some parts of the internal translation) > > - Event time value must be set when the stream is converted, processing > > time is evaluated on the fly > > > > 2. Built-in Functions > > + Users could try to modify time attributes which is not possible with > > functions > > - do not work with joins, because we need to address different relations > > - not a natural way to access a property of a record > > > > I think the only viable choice are system attributes, because built-in > > functions cannot be used for joins. > > However, system attributes are the more complex solution because they > need > > a better integration with Calcite's SQL validator (preventing user > > attributes which are named rowtime for instance). > > > > Since there are currently a several contributions on the way (such as SQL > > OVER windows FLINK-5653 to FLINK-5658) that need time indicators, we > need a > > solution soon to be able to make progress. > > There are two PRs, #3252 and #3271, which implement the built-in marker > > functions proctime() and rowtime() and which could serve as a temporary > > solution (since we do not work on joins yet). > > I would like to suggest to use these functions as a starting point (once > > the PRs are merged) and later change to the system attribute solution > which > > needs a bit more time to be implemented. > > > > I talked with Timo today about this issue and he said he would like to > > investigate how we can implement this as system functions properly > > integrated with Calcite and the SQL Validator. > > > > What do others think? > > > > Best, Fabian > > >