Hi everyone, thanks for sharing your thoughts. I really like Timo’s
proposal, and I have a few thoughts want to share.

We want to keep the query same for batch and streaming. IMO. “process time”
is something special to dataStream while it is not a well defined term for
batch query. So it is kind of free to create something new for processTime.
I think it is a good idea to add a proctime as a reserved keyword for SQL.

 Regarding to “event time”, it is well defined for batch query. So IMO, we
should keep the way of defining a streaming window exactly same as batch
window. Therefore, the row for event time is nothing special, but just a
normal column. The major difference between batch and stream is that in
dataStream the event time column must be associated with a watermark
function. I really like the way Timo proposed, that we can select any
column as rowtime. But I think instead of just clarify a column is a
rowtime (actually I do not think we need this special rowtime keyword), it
is better to register/associate the waterMark function to this column when
creating the table. For dataStream, we will validate a rowtime column only
if it has been associated with the waterMark function. A prototype code to
explain how it looks like is shown as below:

  TableAPI:
     toTable(tEnv, 'a, 'b, 'c)
      .registeredWatermarks('a, waterMarkFunction1)

     batchOrStreamTable
      .window(Tumble over 5.milli on 'a as 'w)
      .groupBy('w, 'b)
      .select('b, 'a.count as cnt1, 'c.sum as cnt2)

  SQL:
    addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c)
      .registeredWatermarks('a, waterMarkFunction1)

    SELECT a, SUM(b) OVER (PARTITION BY c ORDER BY a RANGE BETWEEN 2
PRECEDING) AS sumB FROM MyTable

What do you think ?

2017-02-22 23:44 GMT+08:00 Timo Walther <twal...@apache.org>:

> Hi everyone,
>
> I have create an issue [1] to track the progress of this topic. I have
> written a little design document [2] how we could implement the indicators
> and which parts have to be touched. I would suggest to implement a
> prototype, also to see what is possible and can be integrated both in Flink
> and Calcite. Feedback is welcome.
>
> Regards,
> Timo
>
> [1] https://issues.apache.org/jira/browse/FLINK-5884
> [2] https://docs.google.com/document/d/1JRXm09x_wKst6z6UXdCGF9tg
> F1ueOAsFiQwahR72vbc/edit?usp=sharing
>
>
>
> Am 21/02/17 um 15:06 schrieb Fabian Hueske:
>
> Hi Xingcan,
>>
>> thanks for your thoughts.
>> In principle you are right that the monotone attribute property would be
>> sufficient, however there are more aspects to consider than that.
>>
>> Flink is a parallel stream processor engine which means that data is
>> processed in separate processes and shuffle across them.
>> Maintaining a strict order when merging parallel streams would be
>> prohibitively expensive.
>> Flink's watermark mechanism helps operators to deal with out-of-order data
>> (due to out-of-order input or shuffles).
>> I don't think we can separate the discussion about time attributes from
>> watermarks if we want to use Flink as a processing engine and not
>> reimplement large parts from scratch.
>>
>> When transforming a time attribute, we have to either align it with
>> existing watermarks or generate new watermarks.
>> If we want to allow all kinds of monotone transformations, we have to
>> adapt
>> the watermarks which is not trivial.
>> Instead, I think we should initially only allow very few monotone
>> transformations which are aligned with the existing watermarks. We might
>> later relax this condition if we see that users request this feature.
>>
>> You are right, that we need to track which attribute can be used as a time
>> attribute (i.e., is increasing and guarded by watermarks).
>> For that we need to expose the time attribute when a Table is created
>> (either when a DataStream is converted like: stream.toTable(tEnv, 'a, 'b,
>> 't.rowtime) or in a StreamTableSource) and track how it is used in
>> queries.
>> I am not sure if the monotone property would be the right choice here,
>> since data is only quasi-monotone and a monotone annotation might trigger
>> some invalid optimizations which change the semantics of a query.
>> Right now, Calcite does not offer a quasi-monotone property (at least I
>> haven't found it).
>>
>> Best, Fabian
>>
>>
>> 2017-02-21 4:41 GMT+01:00 Xingcan Cui <xingc...@gmail.com>:
>>
>> Hi all,
>>>
>>> As I said in another thread, the main difference between stream and table
>>> is that a stream is an ordered list while a table is an unordered set.
>>>
>>> Without considering the out-of-order problem in practice, whether
>>> event-time or processing-time can be just taken as a monotonically
>>> increasing field and that's why the given query[1] would work. In other
>>> words, we must guarantee the "SELECT MAX(t22.rowtime) ..." subquery
>>> returns
>>> a single value that can be retrieved from the cached dynamic table since
>>> it's dangerous to join two un-windowed streams.
>>>
>>> Under this circumstance, I just consider adding a "monotonic hint"(INC or
>>> DEC) to the field of a (generalized) table (maybe using an annotation on
>>> the registerDataXX method) that can be used to indicate whether a field
>>> is
>>> monotonically increasing or decreasing. Then by taking rowtime as common
>>> (monotonically increasing) field, there are several benefits:
>>>
>>> 1) This can uniform the table and stream by importing total ordering
>>> relation to an unordered set.
>>>
>>> 2) These fields can be modified arbitrarily as long as they keep the
>>> declared monotonic feature and the watermark problem does not exist any
>>> more.
>>>
>>> 3) The monotonic hint will be useful in the query optimization process.
>>>
>>> What do you think?
>>>
>>> Best,
>>> Xingcan
>>>
>>> [1]
>>> SELECT​ ​t1.amount​,​ ​t2.rate
>>> FROM​ ​
>>>    table1 ​AS​ t1,
>>> ​ ​ table2 ​AS​ ​t2
>>> WHERE ​
>>>    t1.currency = t2.currency AND
>>>    t2.rowtime ​=​ ​(
>>> ​ ​​ ​  SELECT​ ​MAX(t22.rowtime)
>>> ​ ​​ ​  FROM​ ​table2 ​AS​ t22
>>> ​ ​​   ​AND​ ​t22.rowtime ​<=​ t1.rowtime)
>>>
>>> On Tue, Feb 21, 2017 at 2:52 AM, Fabian Hueske <fhue...@gmail.com>
>>> wrote:
>>>
>>> Hi everybody,
>>>>
>>>> When Timo wrote to the Calcite mailing list, Julian Hyde replied and
>>>> gave
>>>> good advice and explained why a system attribute for event-time would be
>>>>
>>> a
>>>
>>>> problem [1].
>>>> I thought about this and agree with Julian.
>>>>
>>>> Here is a document to describe the problem, constraints in Flink and a
>>>> proposal how to handle processing time and event time in Table API and
>>>>
>>> SQL:
>>>
>>>> ->
>>>> https://docs.google.com/document/d/1MDGViWA_
>>>>
>>> TCqpaVoWub7u_GY4PMFSbT8TuaNl-
>>>
>>>> EpbTHQ
>>>>
>>>> Please have a look, comment and ask questions.
>>>>
>>>> Thank you,
>>>> Fabian
>>>>
>>>> [1]
>>>> https://lists.apache.org/thread.html/6397caf0ca37f97f2cd27d96f7a12c
>>>> 6fa845d6fd0870214fdce18d96@%3Cdev.calcite.apache.org%3E
>>>>
>>>> 2017-02-16 1:18 GMT+01:00 Fabian Hueske <fhue...@gmail.com>:
>>>>
>>>> Thanks everybody for the comments.
>>>>>
>>>>> Actually, I think we do not have much choice when deciding whether to
>>>>>
>>>> use
>>>
>>>> attributes or functions.
>>>>> Consider the following join query:
>>>>>
>>>>> SELECT​ ​t1.amount​,​ ​t2.rate
>>>>> FROM​ ​
>>>>>    table1 ​AS​ t1,
>>>>> ​ ​ table2 ​AS​ ​t2
>>>>> WHERE ​
>>>>>    t1.currency = t2.currency AND
>>>>>    t2.rowtime ​=​ ​(
>>>>> ​ ​​ ​  SELECT​ ​MAX(t22.rowtime)
>>>>> ​ ​​ ​  FROM​ ​table2 ​AS​ t22
>>>>> ​ ​​   ​AND​ ​t22.rowtime ​<=​ t1.rowtime)
>>>>>
>>>>> The query joins two streaming tables. Table 1 is a streaming table with
>>>>> amounts in a certain currency. Table 2 is a (slowly changing) streaming
>>>>> table of currency exchange rates.
>>>>> We want to join the amounts stream with the exchange rate of the
>>>>> corresponding currency that is valid (i.e., last received value ->
>>>>> MAX(rowtime)) at the rowtime of the amounts row.
>>>>> In order to specify the query, we need to refer to the rowtime of the
>>>>> different tables. Hence, we need a way to relate the rowtime expression
>>>>>
>>>> (or
>>>>
>>>>> marker) to a table.
>>>>> This is not possible with a parameterless scalar function.
>>>>>
>>>>> I'd like to comment on the concerns regarding the performance:
>>>>> In fact, the columns could be completely virtual and only exist during
>>>>> query parsing and validation.
>>>>> During execution, we can directly access the rowtime metadata of a
>>>>>
>>>> Flink
>>>
>>>> streaming record (which is present anyway) or look up the current
>>>>> processing time from the machine clock. So the processing overhead
>>>>>
>>>> would
>>>
>>>> actually be the same as with a marker function.
>>>>>
>>>>> Regarding the question on what should be allowed with a system
>>>>>
>>>> attribute:
>>>
>>>> IMO, it could be used as any other attribute. We need it at least in
>>>>>
>>>> GROUP
>>>>
>>>>> BY, ORDER BY, and WHERE to define windows and joins. We could also
>>>>>
>>>> allow
>>>
>>>> to
>>>>
>>>>> access it in SELECT if we want users to give access to rowtime and
>>>>> processing time. So @Haohui, your query could be supported.
>>>>> However, what would not be allowed is to modify the value of the rows,
>>>>> i.e., by naming another column rowtime, i.e., "SELECT sometimestamp AS
>>>>> rowtime" would not be allowed, because Flink does not support to modify
>>>>>
>>>> the
>>>>
>>>>> event time of a row (for good reasons) and processing time should not
>>>>>
>>>> be
>>>
>>>> modifiable anyway.
>>>>>
>>>>> @Timo:
>>>>> I think the approach to only use the system columns during parsing and
>>>>> validation and converting them to expressions afterwards makes a lot of
>>>>> sense.
>>>>> The question is how this approach could be nicely integrated with
>>>>>
>>>> Calcite.
>>>>
>>>>> Best, Fabian
>>>>>
>>>>>
>>>>>
>>>>> 2017-02-15 16:50 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>:
>>>>>
>>>>> Hi,
>>>>>>
>>>>>> My initial thought would be that it makes more sense to thave
>>>>>>
>>>>> procTime()
>>>
>>>> and rowTime() only as functions which in fact are to be used as
>>>>>>
>>>>> markers.
>>>
>>>> Having the value (even from special system attributes does not make
>>>>>>
>>>>> sense
>>>>
>>>>> in some scenario such as the ones for creating windows, e.g.,
>>>>>> If you have SELECT Count(*) OVER (ORDER BY procTime()...)
>>>>>> If you get the value of procTime you cannot do anything as you need
>>>>>>
>>>>> the
>>>
>>>> marker to know how to construct the window logic.
>>>>>>
>>>>>> However, your final idea of having " implement some rule/logic that
>>>>>> translates the attributes to special RexNodes internally " I believe
>>>>>>
>>>>> is
>>>
>>>> good and gives a solution to both problems. One the one hand for those
>>>>>> scenarios where you need the value you can access the value, while for
>>>>>> others you can see the special type of the RexNode and use it as a
>>>>>>
>>>>> marker.
>>>>
>>>>> Regarding keeping this data in a table...i am not sure as you would
>>>>>>
>>>>> say
>>>
>>>> we  need to augment the data with two fields whether needed or
>>>>>>
>>>>> not...this
>>>>
>>>>> is nto necessary very efficient
>>>>>>
>>>>>>
>>>>>> Dr. Radu Tudoran
>>>>>> Senior Research Engineer - Big Data Expert
>>>>>> IT R&D Division
>>>>>>
>>>>>>
>>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>>>>>> European Research Center
>>>>>> Riesstrasse 25, 80992 München
>>>>>>
>>>>>> E-mail: radu.tudo...@huawei.com
>>>>>> Mobile: +49 15209084330
>>>>>> Telephone: +49 891588344173
>>>>>>
>>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>>>>>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
>>>>>> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
>>>>>> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
>>>>>> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
>>>>>> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
>>>>>> This e-mail and its attachments contain confidential information from
>>>>>> HUAWEI, which is intended only for the person or entity whose address
>>>>>>
>>>>> is
>>>
>>>> listed above. Any use of the information contained herein in any way
>>>>>> (including, but not limited to, total or partial disclosure,
>>>>>>
>>>>> reproduction,
>>>>
>>>>> or dissemination) by persons other than the intended recipient(s) is
>>>>>> prohibited. If you receive this e-mail in error, please notify the
>>>>>>
>>>>> sender
>>>>
>>>>> by phone or email immediately and delete it!
>>>>>>
>>>>>> -----Original Message-----
>>>>>> From: Timo Walther [mailto:twal...@apache.org]
>>>>>> Sent: Wednesday, February 15, 2017 9:33 AM
>>>>>> To: dev@flink.apache.org
>>>>>> Subject: Re: [DISCUSS] Table API / SQL indicators for event and
>>>>>> processing time
>>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> at first I also thought that built-in functions (rowtime() and
>>>>>> proctime()) are the easiest solution. However, I think to be
>>>>>>
>>>>> future-proof
>>>>
>>>>> we should make them system attributes; esp. to relate them to a
>>>>>> corresponding table in case of multiple tables. Logically they are
>>>>>> attributes of each row, which is already done in Table API.
>>>>>>
>>>>>> I will ask on the Calcite ML if there is a good way for integrating
>>>>>> system attributes. Right now, I would propose the following
>>>>>>
>>>>> implementation:
>>>>
>>>>> - we introduce a custom row type (extending RelDataType)
>>>>>> - in a streaming environment every row has two attributes by default
>>>>>> (rowtime and proctime)
>>>>>> - we do not allow creating a row type with those attributes (this
>>>>>>
>>>>> should
>>>
>>>> already prevent `SELECT field AS rowtime FROM ...`)
>>>>>> - we need to ensure that these attributes are not part of expansion
>>>>>>
>>>>> like
>>>
>>>> `SELECT * FROM ...`
>>>>>> - implement some rule/logic that translates the attributes to special
>>>>>> RexNodes internally, such that the opimizer does not modify these
>>>>>>
>>>>> attributes
>>>>
>>>>> What do you think?
>>>>>>
>>>>>> Regards,
>>>>>> Timo
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> Am 15/02/17 um 03:36 schrieb Xingcan Cui:
>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> thanks for this thread.
>>>>>>>
>>>>>>> @Fabian If I didn't miss the point, the main difference between the
>>>>>>> two approaches is whether or not taking these time attributes as
>>>>>>> common table fields that are directly available to users. Whatever,
>>>>>>> these time attributes should be attached to records (right?), and
>>>>>>>
>>>>>> the
>>>
>>>> discussion lies in whether give them public qualifiers like other
>>>>>>> common fields or private qualifiers and related get/set methods.
>>>>>>>
>>>>>>> The former (system attributes) approach will be more compatible with
>>>>>>> existing SQL read-only operations (e.g., select, join), but we need
>>>>>>>
>>>>>> to
>>>
>>>> add restrictions on SQL modification operation (like what?). I think
>>>>>>> there are no needs to forbid users modifying these attributes via
>>>>>>> table APIs (like map function). Just inform them about these special
>>>>>>> attribute names like system built in aggregator names in iteration.
>>>>>>>
>>>>>>> As for the built in function approach, I don't know if, for now,
>>>>>>>
>>>>>> there
>>>
>>>> are functions applied on a single row (maybe the value access
>>>>>>> functions like COMPOSITE.get(STRING)?). It seems that most of the
>>>>>>> built in functions work for a single field or on columns and thus it
>>>>>>> will be mountains of work if we want to add a new kind of function
>>>>>>>
>>>>>> to
>>>
>>>> SQL. Maybe all existing operations should be modified to support it.
>>>>>>>
>>>>>>> All in all, if there are existing supports for single row function,
>>>>>>>
>>>>>> I
>>>
>>>> prefer the built in function approach. Otherwise the system
>>>>>>>
>>>>>> attributes
>>>
>>>> approach should be better. After all there are not so much
>>>>>>> modification operations in SQL and maybe we can use alias to support
>>>>>>> time attributes setting (just hypothesis, not sure if it's
>>>>>>>
>>>>>> feasible).
>>>
>>>> @Haohui I think the given query is valid if we add a aggregate
>>>>>>> function to (PROCTIME()
>>>>>>> - ROWTIME()) / 1000 and it should be executed efficiently.
>>>>>>>
>>>>>>> Best,
>>>>>>> Xingcan
>>>>>>>
>>>>>>> On Wed, Feb 15, 2017 at 6:17 AM, Haohui Mai <ricet...@gmail.com>
>>>>>>>
>>>>>> wrote:
>>>>
>>>>> Hi,
>>>>>>>>
>>>>>>>> Thanks for starting the discussion. I can see there are multiple
>>>>>>>> trade-offs in these two approaches. One question I have is that to
>>>>>>>> which extent Flink wants to open its APIs to allow users to access
>>>>>>>> both processing and event time.
>>>>>>>>
>>>>>>>> Before we talk about joins, my understanding for the two approaches
>>>>>>>> that you mentioned are essentially (1) treating the value of event
>>>>>>>>
>>>>>>> /
>>>
>>>> processing time as first-class fields for each row, (2) limiting
>>>>>>>>
>>>>>>> the
>>>
>>>> scope of time indicators to only specifying windows. Take the
>>>>>>>> following query as an
>>>>>>>> example:
>>>>>>>>
>>>>>>>> SELECT (PROCTIME() - ROWTIME()) / 1000 AS latency FROM table GROUP
>>>>>>>>
>>>>>>> BY
>>>
>>>> FLOOR(PROCTIME() TO MINUTES)
>>>>>>>>
>>>>>>>> There are several questions we can ask:
>>>>>>>>
>>>>>>>> (1) Is it a valid query?
>>>>>>>> (2) How efficient the query will be?
>>>>>>>>
>>>>>>>> For this query I can see arguments from both sides. I think at the
>>>>>>>> end of the day it really comes down to what Flink wants to support.
>>>>>>>> After working on FLINK-5624 I'm more inclined to support the second
>>>>>>>> approach (i.e., built-in functions). The main reason why is that
>>>>>>>>
>>>>>>> the
>>>
>>>> APIs of Flink are designed to separate times from the real
>>>>>>>>
>>>>>>> payloads.
>>>
>>>> It probably makes sense for the Table / SQL APIs to have the same
>>>>>>>>
>>>>>>> designs.
>>>>>>
>>>>>>> For joins I don't have a clear answer on top of my head. Flink
>>>>>>>> requires two streams to be put in the same window before doing the
>>>>>>>> joins. This is essentially a subset of what SQL can express. I
>>>>>>>>
>>>>>>> don't
>>>
>>>> know what would be the best approach here.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Haohui
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Feb 14, 2017 at 12:26 AM Fabian Hueske <fhue...@gmail.com>
>>>>>>>>
>>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> It would as in the query I gave as an example before:
>>>>>>>>>
>>>>>>>>> SELECT
>>>>>>>>>     a,
>>>>>>>>>     SUM(b) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2
>>>>>>>>> PRECEDING AND CURRENT ROW) AS sumB, FROM myStream
>>>>>>>>>
>>>>>>>>> Here "proctime" would be a system attribute of the table
>>>>>>>>>
>>>>>>>> "myStream".
>>>
>>>> The table would also have another system attribute called
>>>>>>>>>
>>>>>>>> "rowtime"
>>>
>>>> which would be used to indicate event time semantics.
>>>>>>>>> These attributes would always be present in tables which are
>>>>>>>>>
>>>>>>>> derived
>>>
>>>> from streams.
>>>>>>>>> Because we still require that streams have timestamps and
>>>>>>>>>
>>>>>>>> watermarks
>>>
>>>> assigned (either by the StreamTableSource or the somewhere
>>>>>>>>> downstream the DataStream program) when they are converted into a
>>>>>>>>> table, there is no
>>>>>>>>>
>>>>>>>> need
>>>>>>>>
>>>>>>>>> to register anything.
>>>>>>>>>
>>>>>>>>> Does that answer your questions?
>>>>>>>>>
>>>>>>>>> Best, Fabian
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 2017-02-14 2:04 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>:
>>>>>>>>>
>>>>>>>>> Hi Fabian,
>>>>>>>>>>
>>>>>>>>>> Thanks for starting the discussion. Before I give my thoughts on
>>>>>>>>>> this
>>>>>>>>>>
>>>>>>>>> can
>>>>>>>>
>>>>>>>>> you please give some examples of how would you see option of
>>>>>>>>>>
>>>>>>>>> using
>>>
>>>> "system
>>>>>>>>>
>>>>>>>>>> attributes"?
>>>>>>>>>> Do you use this when you register the stream as a table, do you
>>>>>>>>>>
>>>>>>>>> use
>>>
>>>> if when you call an SQL query, do you use it when you translate
>>>>>>>>>> back a
>>>>>>>>>>
>>>>>>>>> table
>>>>>>>>
>>>>>>>>> to a stream / write it to a dynamic table?
>>>>>>>>>>
>>>>>>>>>> Dr. Radu Tudoran
>>>>>>>>>> Senior Research Engineer - Big Data Expert IT R&D Division
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>>>>>>>>>> European Research Center
>>>>>>>>>> Riesstrasse 25, 80992 München
>>>>>>>>>>
>>>>>>>>>> E-mail: radu.tudo...@huawei.com
>>>>>>>>>> Mobile: +49 15209084330 <+49%201520%209084330>
>>>>>>>>>> Telephone: +49 891588344173 <+49%2089%201588344173>
>>>>>>>>>>
>>>>>>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>>>>>>>>>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
>>>>>>>>>> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB
>>>>>>>>>>
>>>>>>>>> 56063,
>>>>
>>>>> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
>>>>>>>>>> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB
>>>>>>>>>>
>>>>>>>>> 56063,
>>>>
>>>>> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
>>>>>>>>>> This e-mail and its attachments contain confidential information
>>>>>>>>>>
>>>>>>>>> from
>>>>
>>>>> HUAWEI, which is intended only for the person or entity whose
>>>>>>>>>>
>>>>>>>>> address
>>>>
>>>>> is
>>>>>>>>
>>>>>>>>> listed above. Any use of the information contained herein in any
>>>>>>>>>>
>>>>>>>>> way
>>>>
>>>>> (including, but not limited to, total or partial disclosure,
>>>>>>>>>>
>>>>>>>>> reproduction,
>>>>>>>>>
>>>>>>>>>> or dissemination) by persons other than the intended recipient(s)
>>>>>>>>>>
>>>>>>>>> is
>>>>
>>>>> prohibited. If you receive this e-mail in error, please notify
>>>>>>>>>>
>>>>>>>>> the
>>>
>>>> sender
>>>>>>>>
>>>>>>>>> by phone or email immediately and delete it!
>>>>>>>>>>
>>>>>>>>>> -----Original Message-----
>>>>>>>>>> From: Fabian Hueske [mailto:fhue...@gmail.com]
>>>>>>>>>> Sent: Tuesday, February 14, 2017 1:01 AM
>>>>>>>>>> To: dev@flink.apache.org
>>>>>>>>>> Subject: [DISCUSS] Table API / SQL indicators for event and
>>>>>>>>>>
>>>>>>>>> processing
>>>>>>
>>>>>>> time
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> I'd like to start an discussion about how Table API / SQL queries
>>>>>>>>>>
>>>>>>>>> indicate
>>>>>>>>>
>>>>>>>>>> whether an operation is done in event or processing time.
>>>>>>>>>>
>>>>>>>>>> 1) Why do we need to indicate the time mode?
>>>>>>>>>>
>>>>>>>>>> We need to distinguish event time and processing time mode for
>>>>>>>>>>
>>>>>>>>> operations
>>>>>>>>
>>>>>>>>> in queries in order to have the semantics of a query fully
>>>>>>>>>>
>>>>>>>>> defined.
>>>
>>>> This cannot be globally done in the TableEnvironment because some
>>>>>>>>>>
>>>>>>>>> queries
>>>>>>>>
>>>>>>>>> explicitly request an expression such as the ORDER BY clause of
>>>>>>>>>>
>>>>>>>>> an
>>>
>>>> OVER
>>>>>>
>>>>>>> window with PRECEDING / FOLLOWING clauses.
>>>>>>>>>> So we need a way to specify something like the following query:
>>>>>>>>>>
>>>>>>>>>> SELECT
>>>>>>>>>>     a,
>>>>>>>>>>     SUM(b) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2
>>>>>>>>>>
>>>>>>>>> PRECEDING
>>>>>>>>
>>>>>>>>> AND CURRENT ROW) AS sumB, FROM myStream
>>>>>>>>>>
>>>>>>>>>> where "proctime" indicates processing time. Equivalently
>>>>>>>>>>
>>>>>>>>> "rowtime"
>>>
>>>> would
>>>>>>>>
>>>>>>>>> indicate event time.
>>>>>>>>>>
>>>>>>>>>> 2) Current state
>>>>>>>>>>
>>>>>>>>>> The current master branch implements time support only for
>>>>>>>>>>
>>>>>>>>> grouping
>>>
>>>> windows in the Table API.
>>>>>>>>>> Internally, the Table API converts a 'rowtime symbol (which looks
>>>>>>>>>>
>>>>>>>>> like
>>>>>>
>>>>>>> a
>>>>>>>>
>>>>>>>>> regular attribute) into a special expression which indicates
>>>>>>>>>>
>>>>>>>>> event-time.
>>>>>>>>
>>>>>>>>> For example:
>>>>>>>>>>
>>>>>>>>>> table
>>>>>>>>>>     .window(Tumble over 5.milli on 'rowtime as 'w)
>>>>>>>>>>     .groupBy('a, 'w)
>>>>>>>>>>     .select(...)
>>>>>>>>>>
>>>>>>>>>> defines a tumbling event-time window.
>>>>>>>>>>
>>>>>>>>>> Processing-time is indicated by omitting a time attribute
>>>>>>>>>> (table.window(Tumble over 5.milli as 'w) ).
>>>>>>>>>>
>>>>>>>>>> 3) How can we do that in SQL?
>>>>>>>>>>
>>>>>>>>>> In SQL we cannot add special expressions without touching the
>>>>>>>>>>
>>>>>>>>> parser
>>>>
>>>>> which
>>>>>>>>>
>>>>>>>>>> we don't want to do because we want to stick to the SQL standard.
>>>>>>>>>> Therefore, I see only two options: adding system attributes or
>>>>>>>>>> (parameterless) built-in functions. I list some pros and cons of
>>>>>>>>>>
>>>>>>>>> the
>>>>
>>>>> approaches below:
>>>>>>>>>>
>>>>>>>>>> 1. System Attributes:
>>>>>>>>>> + most natural way to access a property of a record.
>>>>>>>>>> + works with joins, because time attributes can be related to
>>>>>>>>>>
>>>>>>>>> tables
>>>>
>>>>> - We need to ensure the attributes are not writable and always
>>>>>>>>>>
>>>>>>>>> present
>>>>>>
>>>>>>> in
>>>>>>>>
>>>>>>>>> streaming tables (i.e., they should be system defined
>>>>>>>>>>
>>>>>>>>> attributes).
>>>
>>>> - Need to adapt existing Table API expressions (will not change
>>>>>>>>>>
>>>>>>>>> the
>>>
>>>> API
>>>>>>
>>>>>>> but some parts of the internal translation)
>>>>>>>>>> - Event time value must be set when the stream is converted,
>>>>>>>>>>
>>>>>>>>> processing
>>>>>>
>>>>>>> time is evaluated on the fly
>>>>>>>>>>
>>>>>>>>>> 2. Built-in Functions
>>>>>>>>>> + Users could try to modify time attributes which is not possible
>>>>>>>>>>
>>>>>>>>> with
>>>>>>
>>>>>>> functions
>>>>>>>>>> - do not work with joins, because we need to address different
>>>>>>>>>>
>>>>>>>>> relations
>>>>>>>>
>>>>>>>>> - not a natural way to access a property of a record
>>>>>>>>>>
>>>>>>>>>> I think the only viable choice are system attributes, because
>>>>>>>>>>
>>>>>>>>> built-in
>>>>>>
>>>>>>> functions cannot be used for joins.
>>>>>>>>>> However, system attributes are the more complex solution because
>>>>>>>>>>
>>>>>>>>> they
>>>>
>>>>> need
>>>>>>>>>
>>>>>>>>>> a better integration with Calcite's SQL validator (preventing
>>>>>>>>>>
>>>>>>>>> user
>>>
>>>> attributes which are named rowtime for instance).
>>>>>>>>>>
>>>>>>>>>> Since there are currently a several contributions on the way
>>>>>>>>>>
>>>>>>>>> (such
>>>
>>>> as
>>>>
>>>>> SQL
>>>>>>>>
>>>>>>>>> OVER windows FLINK-5653 to FLINK-5658) that need time indicators,
>>>>>>>>>>
>>>>>>>>> we
>>>>
>>>>> need a
>>>>>>>>>
>>>>>>>>>> solution soon to be able to make progress.
>>>>>>>>>> There are two PRs, #3252 and #3271, which implement the built-in
>>>>>>>>>>
>>>>>>>>> marker
>>>>>>
>>>>>>> functions proctime() and rowtime() and which could serve as a
>>>>>>>>>>
>>>>>>>>> temporary
>>>>>>
>>>>>>> solution (since we do not work on joins yet).
>>>>>>>>>> I would like to suggest to use these functions as a starting
>>>>>>>>>>
>>>>>>>>> point
>>>
>>>> (once
>>>>>>>>
>>>>>>>>> the PRs are merged) and later change to the system attribute
>>>>>>>>>>
>>>>>>>>> solution
>>>>
>>>>> which
>>>>>>>>>
>>>>>>>>>> needs a bit more time to be implemented.
>>>>>>>>>>
>>>>>>>>>> I talked with Timo today about this issue and he said he would
>>>>>>>>>>
>>>>>>>>> like
>>>
>>>> to
>>>>>>
>>>>>>> investigate how we can implement this as system functions
>>>>>>>>>>
>>>>>>>>> properly
>>>
>>>> integrated with Calcite and the SQL Validator.
>>>>>>>>>>
>>>>>>>>>> What do others think?
>>>>>>>>>>
>>>>>>>>>> Best, Fabian
>>>>>>>>>>
>>>>>>>>>>
>>>>>>
>

Reply via email to