Hi Jincheng Sun,

registering watermark functions for different attributes to allow each of
them to be used in a window is an interesting idea.

However, watermarks only work well if the streaming data is (almost) in
timestamp order. Since it is not possible to sort a stream, all attributes
that would qualify as event-time attributes need to be in almost the same
order. I think this limits the benefits of having multiple watermark
functions quite significantly. But maybe you have a good use case that you
can share where multiple event-time attributes would work well.

So far our approach has been that a DataStream which is converted into a
Table has already timestamps and watermarks assigned. We also assumed that
a StreamTableSource would provide watermarks and timestamps and indicate
the name of the attribute that carries the timestamp.

@Stefano: That's great news. I'd suggest to open a pull request and have a
look at PR #3397 which handles the (partitioned) unbounded case. Would be
good to share some code between these approaches.

Thanks, Fabian

2017-02-28 18:17 GMT+01:00 Stefano Bortoli <stefano.bort...@huawei.com>:

> Hi all,
>
> I have completed a first implementation that works for the SQL query
> SELECT a, SUM(b) OVER (PARTITION BY c ORDER BY a RANGE BETWEEN 2
> PRECEDING) AS sumB FROM MyTable
>
> I have SUM, MAX, MIN, AVG, COUNT implemented but I could test it just on
> simple queries such as the one above. Is there any specific case I should
> be looking at?
>
> Regards,
> Stefano
>
> -----Original Message-----
> From: jincheng sun [mailto:sunjincheng...@gmail.com]
> Sent: Tuesday, February 28, 2017 12:26 PM
> To: dev@flink.apache.org
> Subject: Re: [DISCUSS] Table API / SQL indicators for event and processing
> time
>
> Hi everyone, thanks for sharing your thoughts. I really like Timo’s
> proposal, and I have a few thoughts want to share.
>
> We want to keep the query same for batch and streaming. IMO. “process time”
> is something special to dataStream while it is not a well defined term for
> batch query. So it is kind of free to create something new for processTime.
> I think it is a good idea to add a proctime as a reserved keyword for SQL.
>
>  Regarding to “event time”, it is well defined for batch query. So IMO, we
> should keep the way of defining a streaming window exactly same as batch
> window. Therefore, the row for event time is nothing special, but just a
> normal column. The major difference between batch and stream is that in
> dataStream the event time column must be associated with a watermark
> function. I really like the way Timo proposed, that we can select any
> column as rowtime. But I think instead of just clarify a column is a
> rowtime (actually I do not think we need this special rowtime keyword), it
> is better to register/associate the waterMark function to this column when
> creating the table. For dataStream, we will validate a rowtime column only
> if it has been associated with the waterMark function. A prototype code to
> explain how it looks like is shown as below:
>
>   TableAPI:
>      toTable(tEnv, 'a, 'b, 'c)
>       .registeredWatermarks('a, waterMarkFunction1)
>
>      batchOrStreamTable
>       .window(Tumble over 5.milli on 'a as 'w)
>       .groupBy('w, 'b)
>       .select('b, 'a.count as cnt1, 'c.sum as cnt2)
>
>   SQL:
>     addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c)
>       .registeredWatermarks('a, waterMarkFunction1)
>
>     SELECT a, SUM(b) OVER (PARTITION BY c ORDER BY a RANGE BETWEEN 2
> PRECEDING) AS sumB FROM MyTable
>
> What do you think ?
>
> 2017-02-22 23:44 GMT+08:00 Timo Walther <twal...@apache.org>:
>
> > Hi everyone,
> >
> > I have create an issue [1] to track the progress of this topic. I have
> > written a little design document [2] how we could implement the
> > indicators and which parts have to be touched. I would suggest to
> > implement a prototype, also to see what is possible and can be
> > integrated both in Flink and Calcite. Feedback is welcome.
> >
> > Regards,
> > Timo
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-5884
> > [2] https://docs.google.com/document/d/1JRXm09x_wKst6z6UXdCGF9tg
> > F1ueOAsFiQwahR72vbc/edit?usp=sharing
> >
> >
> >
> > Am 21/02/17 um 15:06 schrieb Fabian Hueske:
> >
> > Hi Xingcan,
> >>
> >> thanks for your thoughts.
> >> In principle you are right that the monotone attribute property would
> >> be sufficient, however there are more aspects to consider than that.
> >>
> >> Flink is a parallel stream processor engine which means that data is
> >> processed in separate processes and shuffle across them.
> >> Maintaining a strict order when merging parallel streams would be
> >> prohibitively expensive.
> >> Flink's watermark mechanism helps operators to deal with out-of-order
> >> data (due to out-of-order input or shuffles).
> >> I don't think we can separate the discussion about time attributes
> >> from watermarks if we want to use Flink as a processing engine and
> >> not reimplement large parts from scratch.
> >>
> >> When transforming a time attribute, we have to either align it with
> >> existing watermarks or generate new watermarks.
> >> If we want to allow all kinds of monotone transformations, we have to
> >> adapt the watermarks which is not trivial.
> >> Instead, I think we should initially only allow very few monotone
> >> transformations which are aligned with the existing watermarks. We
> >> might later relax this condition if we see that users request this
> feature.
> >>
> >> You are right, that we need to track which attribute can be used as a
> >> time attribute (i.e., is increasing and guarded by watermarks).
> >> For that we need to expose the time attribute when a Table is created
> >> (either when a DataStream is converted like: stream.toTable(tEnv, 'a,
> >> 'b,
> >> 't.rowtime) or in a StreamTableSource) and track how it is used in
> >> queries.
> >> I am not sure if the monotone property would be the right choice
> >> here, since data is only quasi-monotone and a monotone annotation
> >> might trigger some invalid optimizations which change the semantics of
> a query.
> >> Right now, Calcite does not offer a quasi-monotone property (at least
> >> I haven't found it).
> >>
> >> Best, Fabian
> >>
> >>
> >> 2017-02-21 4:41 GMT+01:00 Xingcan Cui <xingc...@gmail.com>:
> >>
> >> Hi all,
> >>>
> >>> As I said in another thread, the main difference between stream and
> >>> table is that a stream is an ordered list while a table is an
> unordered set.
> >>>
> >>> Without considering the out-of-order problem in practice, whether
> >>> event-time or processing-time can be just taken as a monotonically
> >>> increasing field and that's why the given query[1] would work. In
> >>> other words, we must guarantee the "SELECT MAX(t22.rowtime) ..."
> >>> subquery returns a single value that can be retrieved from the
> >>> cached dynamic table since it's dangerous to join two un-windowed
> >>> streams.
> >>>
> >>> Under this circumstance, I just consider adding a "monotonic
> >>> hint"(INC or
> >>> DEC) to the field of a (generalized) table (maybe using an
> >>> annotation on the registerDataXX method) that can be used to
> >>> indicate whether a field is monotonically increasing or decreasing.
> >>> Then by taking rowtime as common (monotonically increasing) field,
> >>> there are several benefits:
> >>>
> >>> 1) This can uniform the table and stream by importing total ordering
> >>> relation to an unordered set.
> >>>
> >>> 2) These fields can be modified arbitrarily as long as they keep the
> >>> declared monotonic feature and the watermark problem does not exist
> >>> any more.
> >>>
> >>> 3) The monotonic hint will be useful in the query optimization process.
> >>>
> >>> What do you think?
> >>>
> >>> Best,
> >>> Xingcan
> >>>
> >>> [1]
> >>> SELECT​ ​t1.amount​,​ ​t2.rate
> >>> FROM​ ​
> >>>    table1 ​AS​ t1,
> >>> ​ ​ table2 ​AS​ ​t2
> >>> WHERE ​
> >>>    t1.currency = t2.currency AND
> >>>    t2.rowtime ​=​ ​(
> >>> ​ ​​ ​  SELECT​ ​MAX(t22.rowtime)
> >>> ​ ​​ ​  FROM​ ​table2 ​AS​ t22
> >>> ​ ​​   ​AND​ ​t22.rowtime ​<=​ t1.rowtime)
> >>>
> >>> On Tue, Feb 21, 2017 at 2:52 AM, Fabian Hueske <fhue...@gmail.com>
> >>> wrote:
> >>>
> >>> Hi everybody,
> >>>>
> >>>> When Timo wrote to the Calcite mailing list, Julian Hyde replied
> >>>> and gave good advice and explained why a system attribute for
> >>>> event-time would be
> >>>>
> >>> a
> >>>
> >>>> problem [1].
> >>>> I thought about this and agree with Julian.
> >>>>
> >>>> Here is a document to describe the problem, constraints in Flink
> >>>> and a proposal how to handle processing time and event time in
> >>>> Table API and
> >>>>
> >>> SQL:
> >>>
> >>>> ->
> >>>> https://docs.google.com/document/d/1MDGViWA_
> >>>>
> >>> TCqpaVoWub7u_GY4PMFSbT8TuaNl-
> >>>
> >>>> EpbTHQ
> >>>>
> >>>> Please have a look, comment and ask questions.
> >>>>
> >>>> Thank you,
> >>>> Fabian
> >>>>
> >>>> [1]
> >>>> https://lists.apache.org/thread.html/6397caf0ca37f97f2cd27d96f7a12c
> >>>> 6fa845d6fd0870214fdce18d96@%3Cdev.calcite.apache.org%3E
> >>>>
> >>>> 2017-02-16 1:18 GMT+01:00 Fabian Hueske <fhue...@gmail.com>:
> >>>>
> >>>> Thanks everybody for the comments.
> >>>>>
> >>>>> Actually, I think we do not have much choice when deciding whether
> >>>>> to
> >>>>>
> >>>> use
> >>>
> >>>> attributes or functions.
> >>>>> Consider the following join query:
> >>>>>
> >>>>> SELECT​ ​t1.amount​,​ ​t2.rate
> >>>>> FROM​ ​
> >>>>>    table1 ​AS​ t1,
> >>>>> ​ ​ table2 ​AS​ ​t2
> >>>>> WHERE ​
> >>>>>    t1.currency = t2.currency AND
> >>>>>    t2.rowtime ​=​ ​(
> >>>>> ​ ​​ ​  SELECT​ ​MAX(t22.rowtime)
> >>>>> ​ ​​ ​  FROM​ ​table2 ​AS​ t22
> >>>>> ​ ​​   ​AND​ ​t22.rowtime ​<=​ t1.rowtime)
> >>>>>
> >>>>> The query joins two streaming tables. Table 1 is a streaming table
> >>>>> with amounts in a certain currency. Table 2 is a (slowly changing)
> >>>>> streaming table of currency exchange rates.
> >>>>> We want to join the amounts stream with the exchange rate of the
> >>>>> corresponding currency that is valid (i.e., last received value ->
> >>>>> MAX(rowtime)) at the rowtime of the amounts row.
> >>>>> In order to specify the query, we need to refer to the rowtime of
> >>>>> the different tables. Hence, we need a way to relate the rowtime
> >>>>> expression
> >>>>>
> >>>> (or
> >>>>
> >>>>> marker) to a table.
> >>>>> This is not possible with a parameterless scalar function.
> >>>>>
> >>>>> I'd like to comment on the concerns regarding the performance:
> >>>>> In fact, the columns could be completely virtual and only exist
> >>>>> during query parsing and validation.
> >>>>> During execution, we can directly access the rowtime metadata of a
> >>>>>
> >>>> Flink
> >>>
> >>>> streaming record (which is present anyway) or look up the current
> >>>>> processing time from the machine clock. So the processing overhead
> >>>>>
> >>>> would
> >>>
> >>>> actually be the same as with a marker function.
> >>>>>
> >>>>> Regarding the question on what should be allowed with a system
> >>>>>
> >>>> attribute:
> >>>
> >>>> IMO, it could be used as any other attribute. We need it at least
> >>>> in
> >>>>>
> >>>> GROUP
> >>>>
> >>>>> BY, ORDER BY, and WHERE to define windows and joins. We could also
> >>>>>
> >>>> allow
> >>>
> >>>> to
> >>>>
> >>>>> access it in SELECT if we want users to give access to rowtime and
> >>>>> processing time. So @Haohui, your query could be supported.
> >>>>> However, what would not be allowed is to modify the value of the
> >>>>> rows, i.e., by naming another column rowtime, i.e., "SELECT
> >>>>> sometimestamp AS rowtime" would not be allowed, because Flink does
> >>>>> not support to modify
> >>>>>
> >>>> the
> >>>>
> >>>>> event time of a row (for good reasons) and processing time should
> >>>>> not
> >>>>>
> >>>> be
> >>>
> >>>> modifiable anyway.
> >>>>>
> >>>>> @Timo:
> >>>>> I think the approach to only use the system columns during parsing
> >>>>> and validation and converting them to expressions afterwards makes
> >>>>> a lot of sense.
> >>>>> The question is how this approach could be nicely integrated with
> >>>>>
> >>>> Calcite.
> >>>>
> >>>>> Best, Fabian
> >>>>>
> >>>>>
> >>>>>
> >>>>> 2017-02-15 16:50 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>:
> >>>>>
> >>>>> Hi,
> >>>>>>
> >>>>>> My initial thought would be that it makes more sense to thave
> >>>>>>
> >>>>> procTime()
> >>>
> >>>> and rowTime() only as functions which in fact are to be used as
> >>>>>>
> >>>>> markers.
> >>>
> >>>> Having the value (even from special system attributes does not make
> >>>>>>
> >>>>> sense
> >>>>
> >>>>> in some scenario such as the ones for creating windows, e.g.,
> >>>>>> If you have SELECT Count(*) OVER (ORDER BY procTime()...) If you
> >>>>>> get the value of procTime you cannot do anything as you need
> >>>>>>
> >>>>> the
> >>>
> >>>> marker to know how to construct the window logic.
> >>>>>>
> >>>>>> However, your final idea of having " implement some rule/logic
> >>>>>> that translates the attributes to special RexNodes internally " I
> >>>>>> believe
> >>>>>>
> >>>>> is
> >>>
> >>>> good and gives a solution to both problems. One the one hand for
> >>>> those
> >>>>>> scenarios where you need the value you can access the value,
> >>>>>> while for others you can see the special type of the RexNode and
> >>>>>> use it as a
> >>>>>>
> >>>>> marker.
> >>>>
> >>>>> Regarding keeping this data in a table...i am not sure as you
> >>>>> would
> >>>>>>
> >>>>> say
> >>>
> >>>> we  need to augment the data with two fields whether needed or
> >>>>>>
> >>>>> not...this
> >>>>
> >>>>> is nto necessary very efficient
> >>>>>>
> >>>>>>
> >>>>>> Dr. Radu Tudoran
> >>>>>> Senior Research Engineer - Big Data Expert IT R&D Division
> >>>>>>
> >>>>>>
> >>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> >>>>>> European Research Center
> >>>>>> Riesstrasse 25, 80992 München
> >>>>>>
> >>>>>> E-mail: radu.tudo...@huawei.com
> >>>>>> Mobile: +49 15209084330
> >>>>>> Telephone: +49 891588344173
> >>>>>>
> >>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> >>>>>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> >>>>>> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> >>>>>> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> >>>>>> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB
> 56063,
> >>>>>> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> >>>>>> This e-mail and its attachments contain confidential information
> from
> >>>>>> HUAWEI, which is intended only for the person or entity whose
> address
> >>>>>>
> >>>>> is
> >>>
> >>>> listed above. Any use of the information contained herein in any way
> >>>>>> (including, but not limited to, total or partial disclosure,
> >>>>>>
> >>>>> reproduction,
> >>>>
> >>>>> or dissemination) by persons other than the intended recipient(s) is
> >>>>>> prohibited. If you receive this e-mail in error, please notify the
> >>>>>>
> >>>>> sender
> >>>>
> >>>>> by phone or email immediately and delete it!
> >>>>>>
> >>>>>> -----Original Message-----
> >>>>>> From: Timo Walther [mailto:twal...@apache.org]
> >>>>>> Sent: Wednesday, February 15, 2017 9:33 AM
> >>>>>> To: dev@flink.apache.org
> >>>>>> Subject: Re: [DISCUSS] Table API / SQL indicators for event and
> >>>>>> processing time
> >>>>>>
> >>>>>> Hi all,
> >>>>>>
> >>>>>> at first I also thought that built-in functions (rowtime() and
> >>>>>> proctime()) are the easiest solution. However, I think to be
> >>>>>>
> >>>>> future-proof
> >>>>
> >>>>> we should make them system attributes; esp. to relate them to a
> >>>>>> corresponding table in case of multiple tables. Logically they are
> >>>>>> attributes of each row, which is already done in Table API.
> >>>>>>
> >>>>>> I will ask on the Calcite ML if there is a good way for integrating
> >>>>>> system attributes. Right now, I would propose the following
> >>>>>>
> >>>>> implementation:
> >>>>
> >>>>> - we introduce a custom row type (extending RelDataType)
> >>>>>> - in a streaming environment every row has two attributes by default
> >>>>>> (rowtime and proctime)
> >>>>>> - we do not allow creating a row type with those attributes (this
> >>>>>>
> >>>>> should
> >>>
> >>>> already prevent `SELECT field AS rowtime FROM ...`)
> >>>>>> - we need to ensure that these attributes are not part of expansion
> >>>>>>
> >>>>> like
> >>>
> >>>> `SELECT * FROM ...`
> >>>>>> - implement some rule/logic that translates the attributes to
> special
> >>>>>> RexNodes internally, such that the opimizer does not modify these
> >>>>>>
> >>>>> attributes
> >>>>
> >>>>> What do you think?
> >>>>>>
> >>>>>> Regards,
> >>>>>> Timo
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> Am 15/02/17 um 03:36 schrieb Xingcan Cui:
> >>>>>>
> >>>>>>> Hi all,
> >>>>>>>
> >>>>>>> thanks for this thread.
> >>>>>>>
> >>>>>>> @Fabian If I didn't miss the point, the main difference between the
> >>>>>>> two approaches is whether or not taking these time attributes as
> >>>>>>> common table fields that are directly available to users. Whatever,
> >>>>>>> these time attributes should be attached to records (right?), and
> >>>>>>>
> >>>>>> the
> >>>
> >>>> discussion lies in whether give them public qualifiers like other
> >>>>>>> common fields or private qualifiers and related get/set methods.
> >>>>>>>
> >>>>>>> The former (system attributes) approach will be more compatible
> with
> >>>>>>> existing SQL read-only operations (e.g., select, join), but we need
> >>>>>>>
> >>>>>> to
> >>>
> >>>> add restrictions on SQL modification operation (like what?). I think
> >>>>>>> there are no needs to forbid users modifying these attributes via
> >>>>>>> table APIs (like map function). Just inform them about these
> special
> >>>>>>> attribute names like system built in aggregator names in iteration.
> >>>>>>>
> >>>>>>> As for the built in function approach, I don't know if, for now,
> >>>>>>>
> >>>>>> there
> >>>
> >>>> are functions applied on a single row (maybe the value access
> >>>>>>> functions like COMPOSITE.get(STRING)?). It seems that most of the
> >>>>>>> built in functions work for a single field or on columns and thus
> it
> >>>>>>> will be mountains of work if we want to add a new kind of function
> >>>>>>>
> >>>>>> to
> >>>
> >>>> SQL. Maybe all existing operations should be modified to support it.
> >>>>>>>
> >>>>>>> All in all, if there are existing supports for single row function,
> >>>>>>>
> >>>>>> I
> >>>
> >>>> prefer the built in function approach. Otherwise the system
> >>>>>>>
> >>>>>> attributes
> >>>
> >>>> approach should be better. After all there are not so much
> >>>>>>> modification operations in SQL and maybe we can use alias to
> support
> >>>>>>> time attributes setting (just hypothesis, not sure if it's
> >>>>>>>
> >>>>>> feasible).
> >>>
> >>>> @Haohui I think the given query is valid if we add a aggregate
> >>>>>>> function to (PROCTIME()
> >>>>>>> - ROWTIME()) / 1000 and it should be executed efficiently.
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Xingcan
> >>>>>>>
> >>>>>>> On Wed, Feb 15, 2017 at 6:17 AM, Haohui Mai <ricet...@gmail.com>
> >>>>>>>
> >>>>>> wrote:
> >>>>
> >>>>> Hi,
> >>>>>>>>
> >>>>>>>> Thanks for starting the discussion. I can see there are multiple
> >>>>>>>> trade-offs in these two approaches. One question I have is that to
> >>>>>>>> which extent Flink wants to open its APIs to allow users to access
> >>>>>>>> both processing and event time.
> >>>>>>>>
> >>>>>>>> Before we talk about joins, my understanding for the two
> approaches
> >>>>>>>> that you mentioned are essentially (1) treating the value of event
> >>>>>>>>
> >>>>>>> /
> >>>
> >>>> processing time as first-class fields for each row, (2) limiting
> >>>>>>>>
> >>>>>>> the
> >>>
> >>>> scope of time indicators to only specifying windows. Take the
> >>>>>>>> following query as an
> >>>>>>>> example:
> >>>>>>>>
> >>>>>>>> SELECT (PROCTIME() - ROWTIME()) / 1000 AS latency FROM table GROUP
> >>>>>>>>
> >>>>>>> BY
> >>>
> >>>> FLOOR(PROCTIME() TO MINUTES)
> >>>>>>>>
> >>>>>>>> There are several questions we can ask:
> >>>>>>>>
> >>>>>>>> (1) Is it a valid query?
> >>>>>>>> (2) How efficient the query will be?
> >>>>>>>>
> >>>>>>>> For this query I can see arguments from both sides. I think at the
> >>>>>>>> end of the day it really comes down to what Flink wants to
> support.
> >>>>>>>> After working on FLINK-5624 I'm more inclined to support the
> second
> >>>>>>>> approach (i.e., built-in functions). The main reason why is that
> >>>>>>>>
> >>>>>>> the
> >>>
> >>>> APIs of Flink are designed to separate times from the real
> >>>>>>>>
> >>>>>>> payloads.
> >>>
> >>>> It probably makes sense for the Table / SQL APIs to have the same
> >>>>>>>>
> >>>>>>> designs.
> >>>>>>
> >>>>>>> For joins I don't have a clear answer on top of my head. Flink
> >>>>>>>> requires two streams to be put in the same window before doing the
> >>>>>>>> joins. This is essentially a subset of what SQL can express. I
> >>>>>>>>
> >>>>>>> don't
> >>>
> >>>> know what would be the best approach here.
> >>>>>>>>
> >>>>>>>> Regards,
> >>>>>>>> Haohui
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Tue, Feb 14, 2017 at 12:26 AM Fabian Hueske <fhue...@gmail.com
> >
> >>>>>>>>
> >>>>>>> wrote:
> >>>>>>
> >>>>>>> Hi,
> >>>>>>>>>
> >>>>>>>>> It would as in the query I gave as an example before:
> >>>>>>>>>
> >>>>>>>>> SELECT
> >>>>>>>>>     a,
> >>>>>>>>>     SUM(b) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2
> >>>>>>>>> PRECEDING AND CURRENT ROW) AS sumB, FROM myStream
> >>>>>>>>>
> >>>>>>>>> Here "proctime" would be a system attribute of the table
> >>>>>>>>>
> >>>>>>>> "myStream".
> >>>
> >>>> The table would also have another system attribute called
> >>>>>>>>>
> >>>>>>>> "rowtime"
> >>>
> >>>> which would be used to indicate event time semantics.
> >>>>>>>>> These attributes would always be present in tables which are
> >>>>>>>>>
> >>>>>>>> derived
> >>>
> >>>> from streams.
> >>>>>>>>> Because we still require that streams have timestamps and
> >>>>>>>>>
> >>>>>>>> watermarks
> >>>
> >>>> assigned (either by the StreamTableSource or the somewhere
> >>>>>>>>> downstream the DataStream program) when they are converted into a
> >>>>>>>>> table, there is no
> >>>>>>>>>
> >>>>>>>> need
> >>>>>>>>
> >>>>>>>>> to register anything.
> >>>>>>>>>
> >>>>>>>>> Does that answer your questions?
> >>>>>>>>>
> >>>>>>>>> Best, Fabian
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> 2017-02-14 2:04 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com
> >:
> >>>>>>>>>
> >>>>>>>>> Hi Fabian,
> >>>>>>>>>>
> >>>>>>>>>> Thanks for starting the discussion. Before I give my thoughts on
> >>>>>>>>>> this
> >>>>>>>>>>
> >>>>>>>>> can
> >>>>>>>>
> >>>>>>>>> you please give some examples of how would you see option of
> >>>>>>>>>>
> >>>>>>>>> using
> >>>
> >>>> "system
> >>>>>>>>>
> >>>>>>>>>> attributes"?
> >>>>>>>>>> Do you use this when you register the stream as a table, do you
> >>>>>>>>>>
> >>>>>>>>> use
> >>>
> >>>> if when you call an SQL query, do you use it when you translate
> >>>>>>>>>> back a
> >>>>>>>>>>
> >>>>>>>>> table
> >>>>>>>>
> >>>>>>>>> to a stream / write it to a dynamic table?
> >>>>>>>>>>
> >>>>>>>>>> Dr. Radu Tudoran
> >>>>>>>>>> Senior Research Engineer - Big Data Expert IT R&D Division
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> >>>>>>>>>> European Research Center
> >>>>>>>>>> Riesstrasse 25, 80992 München
> >>>>>>>>>>
> >>>>>>>>>> E-mail: radu.tudo...@huawei.com
> >>>>>>>>>> Mobile: +49 15209084330 <+49%201520%209084330>
> >>>>>>>>>> Telephone: +49 891588344173 <+49%2089%201588344173>
> >>>>>>>>>>
> >>>>>>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> >>>>>>>>>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> >>>>>>>>>> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB
> >>>>>>>>>>
> >>>>>>>>> 56063,
> >>>>
> >>>>> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> >>>>>>>>>> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB
> >>>>>>>>>>
> >>>>>>>>> 56063,
> >>>>
> >>>>> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> >>>>>>>>>> This e-mail and its attachments contain confidential information
> >>>>>>>>>>
> >>>>>>>>> from
> >>>>
> >>>>> HUAWEI, which is intended only for the person or entity whose
> >>>>>>>>>>
> >>>>>>>>> address
> >>>>
> >>>>> is
> >>>>>>>>
> >>>>>>>>> listed above. Any use of the information contained herein in any
> >>>>>>>>>>
> >>>>>>>>> way
> >>>>
> >>>>> (including, but not limited to, total or partial disclosure,
> >>>>>>>>>>
> >>>>>>>>> reproduction,
> >>>>>>>>>
> >>>>>>>>>> or dissemination) by persons other than the intended
> recipient(s)
> >>>>>>>>>>
> >>>>>>>>> is
> >>>>
> >>>>> prohibited. If you receive this e-mail in error, please notify
> >>>>>>>>>>
> >>>>>>>>> the
> >>>
> >>>> sender
> >>>>>>>>
> >>>>>>>>> by phone or email immediately and delete it!
> >>>>>>>>>>
> >>>>>>>>>> -----Original Message-----
> >>>>>>>>>> From: Fabian Hueske [mailto:fhue...@gmail.com]
> >>>>>>>>>> Sent: Tuesday, February 14, 2017 1:01 AM
> >>>>>>>>>> To: dev@flink.apache.org
> >>>>>>>>>> Subject: [DISCUSS] Table API / SQL indicators for event and
> >>>>>>>>>>
> >>>>>>>>> processing
> >>>>>>
> >>>>>>> time
> >>>>>>>>>
> >>>>>>>>>> Hi,
> >>>>>>>>>>
> >>>>>>>>>> I'd like to start an discussion about how Table API / SQL
> queries
> >>>>>>>>>>
> >>>>>>>>> indicate
> >>>>>>>>>
> >>>>>>>>>> whether an operation is done in event or processing time.
> >>>>>>>>>>
> >>>>>>>>>> 1) Why do we need to indicate the time mode?
> >>>>>>>>>>
> >>>>>>>>>> We need to distinguish event time and processing time mode for
> >>>>>>>>>>
> >>>>>>>>> operations
> >>>>>>>>
> >>>>>>>>> in queries in order to have the semantics of a query fully
> >>>>>>>>>>
> >>>>>>>>> defined.
> >>>
> >>>> This cannot be globally done in the TableEnvironment because some
> >>>>>>>>>>
> >>>>>>>>> queries
> >>>>>>>>
> >>>>>>>>> explicitly request an expression such as the ORDER BY clause of
> >>>>>>>>>>
> >>>>>>>>> an
> >>>
> >>>> OVER
> >>>>>>
> >>>>>>> window with PRECEDING / FOLLOWING clauses.
> >>>>>>>>>> So we need a way to specify something like the following query:
> >>>>>>>>>>
> >>>>>>>>>> SELECT
> >>>>>>>>>>     a,
> >>>>>>>>>>     SUM(b) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2
> >>>>>>>>>>
> >>>>>>>>> PRECEDING
> >>>>>>>>
> >>>>>>>>> AND CURRENT ROW) AS sumB, FROM myStream
> >>>>>>>>>>
> >>>>>>>>>> where "proctime" indicates processing time. Equivalently
> >>>>>>>>>>
> >>>>>>>>> "rowtime"
> >>>
> >>>> would
> >>>>>>>>
> >>>>>>>>> indicate event time.
> >>>>>>>>>>
> >>>>>>>>>> 2) Current state
> >>>>>>>>>>
> >>>>>>>>>> The current master branch implements time support only for
> >>>>>>>>>>
> >>>>>>>>> grouping
> >>>
> >>>> windows in the Table API.
> >>>>>>>>>> Internally, the Table API converts a 'rowtime symbol (which
> looks
> >>>>>>>>>>
> >>>>>>>>> like
> >>>>>>
> >>>>>>> a
> >>>>>>>>
> >>>>>>>>> regular attribute) into a special expression which indicates
> >>>>>>>>>>
> >>>>>>>>> event-time.
> >>>>>>>>
> >>>>>>>>> For example:
> >>>>>>>>>>
> >>>>>>>>>> table
> >>>>>>>>>>     .window(Tumble over 5.milli on 'rowtime as 'w)
> >>>>>>>>>>     .groupBy('a, 'w)
> >>>>>>>>>>     .select(...)
> >>>>>>>>>>
> >>>>>>>>>> defines a tumbling event-time window.
> >>>>>>>>>>
> >>>>>>>>>> Processing-time is indicated by omitting a time attribute
> >>>>>>>>>> (table.window(Tumble over 5.milli as 'w) ).
> >>>>>>>>>>
> >>>>>>>>>> 3) How can we do that in SQL?
> >>>>>>>>>>
> >>>>>>>>>> In SQL we cannot add special expressions without touching the
> >>>>>>>>>>
> >>>>>>>>> parser
> >>>>
> >>>>> which
> >>>>>>>>>
> >>>>>>>>>> we don't want to do because we want to stick to the SQL
> standard.
> >>>>>>>>>> Therefore, I see only two options: adding system attributes or
> >>>>>>>>>> (parameterless) built-in functions. I list some pros and cons of
> >>>>>>>>>>
> >>>>>>>>> the
> >>>>
> >>>>> approaches below:
> >>>>>>>>>>
> >>>>>>>>>> 1. System Attributes:
> >>>>>>>>>> + most natural way to access a property of a record.
> >>>>>>>>>> + works with joins, because time attributes can be related to
> >>>>>>>>>>
> >>>>>>>>> tables
> >>>>
> >>>>> - We need to ensure the attributes are not writable and always
> >>>>>>>>>>
> >>>>>>>>> present
> >>>>>>
> >>>>>>> in
> >>>>>>>>
> >>>>>>>>> streaming tables (i.e., they should be system defined
> >>>>>>>>>>
> >>>>>>>>> attributes).
> >>>
> >>>> - Need to adapt existing Table API expressions (will not change
> >>>>>>>>>>
> >>>>>>>>> the
> >>>
> >>>> API
> >>>>>>
> >>>>>>> but some parts of the internal translation)
> >>>>>>>>>> - Event time value must be set when the stream is converted,
> >>>>>>>>>>
> >>>>>>>>> processing
> >>>>>>
> >>>>>>> time is evaluated on the fly
> >>>>>>>>>>
> >>>>>>>>>> 2. Built-in Functions
> >>>>>>>>>> + Users could try to modify time attributes which is not
> possible
> >>>>>>>>>>
> >>>>>>>>> with
> >>>>>>
> >>>>>>> functions
> >>>>>>>>>> - do not work with joins, because we need to address different
> >>>>>>>>>>
> >>>>>>>>> relations
> >>>>>>>>
> >>>>>>>>> - not a natural way to access a property of a record
> >>>>>>>>>>
> >>>>>>>>>> I think the only viable choice are system attributes, because
> >>>>>>>>>>
> >>>>>>>>> built-in
> >>>>>>
> >>>>>>> functions cannot be used for joins.
> >>>>>>>>>> However, system attributes are the more complex solution because
> >>>>>>>>>>
> >>>>>>>>> they
> >>>>
> >>>>> need
> >>>>>>>>>
> >>>>>>>>>> a better integration with Calcite's SQL validator (preventing
> >>>>>>>>>>
> >>>>>>>>> user
> >>>
> >>>> attributes which are named rowtime for instance).
> >>>>>>>>>>
> >>>>>>>>>> Since there are currently a several contributions on the way
> >>>>>>>>>>
> >>>>>>>>> (such
> >>>
> >>>> as
> >>>>
> >>>>> SQL
> >>>>>>>>
> >>>>>>>>> OVER windows FLINK-5653 to FLINK-5658) that need time indicators,
> >>>>>>>>>>
> >>>>>>>>> we
> >>>>
> >>>>> need a
> >>>>>>>>>
> >>>>>>>>>> solution soon to be able to make progress.
> >>>>>>>>>> There are two PRs, #3252 and #3271, which implement the built-in
> >>>>>>>>>>
> >>>>>>>>> marker
> >>>>>>
> >>>>>>> functions proctime() and rowtime() and which could serve as a
> >>>>>>>>>>
> >>>>>>>>> temporary
> >>>>>>
> >>>>>>> solution (since we do not work on joins yet).
> >>>>>>>>>> I would like to suggest to use these functions as a starting
> >>>>>>>>>>
> >>>>>>>>> point
> >>>
> >>>> (once
> >>>>>>>>
> >>>>>>>>> the PRs are merged) and later change to the system attribute
> >>>>>>>>>>
> >>>>>>>>> solution
> >>>>
> >>>>> which
> >>>>>>>>>
> >>>>>>>>>> needs a bit more time to be implemented.
> >>>>>>>>>>
> >>>>>>>>>> I talked with Timo today about this issue and he said he would
> >>>>>>>>>>
> >>>>>>>>> like
> >>>
> >>>> to
> >>>>>>
> >>>>>>> investigate how we can implement this as system functions
> >>>>>>>>>>
> >>>>>>>>> properly
> >>>
> >>>> integrated with Calcite and the SQL Validator.
> >>>>>>>>>>
> >>>>>>>>>> What do others think?
> >>>>>>>>>>
> >>>>>>>>>> Best, Fabian
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>
> >
>

Reply via email to