Hi all,

I have a question about the designate time for `rowtime`. The current
design do this during the DataStream to Table conversion. Does this mean
that `rowtime` is only valid for the source streams and can not be
designated after a subquery? (That's why I considered using alias to
dynamically designate it in a SQL before)

Best,
Xingcan

On Wed, Mar 1, 2017 at 5:35 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Jincheng Sun,
>
> registering watermark functions for different attributes to allow each of
> them to be used in a window is an interesting idea.
>
> However, watermarks only work well if the streaming data is (almost) in
> timestamp order. Since it is not possible to sort a stream, all attributes
> that would qualify as event-time attributes need to be in almost the same
> order. I think this limits the benefits of having multiple watermark
> functions quite significantly. But maybe you have a good use case that you
> can share where multiple event-time attributes would work well.
>
> So far our approach has been that a DataStream which is converted into a
> Table has already timestamps and watermarks assigned. We also assumed that
> a StreamTableSource would provide watermarks and timestamps and indicate
> the name of the attribute that carries the timestamp.
>
> @Stefano: That's great news. I'd suggest to open a pull request and have a
> look at PR #3397 which handles the (partitioned) unbounded case. Would be
> good to share some code between these approaches.
>
> Thanks, Fabian
>
> 2017-02-28 18:17 GMT+01:00 Stefano Bortoli <stefano.bort...@huawei.com>:
>
> > Hi all,
> >
> > I have completed a first implementation that works for the SQL query
> > SELECT a, SUM(b) OVER (PARTITION BY c ORDER BY a RANGE BETWEEN 2
> > PRECEDING) AS sumB FROM MyTable
> >
> > I have SUM, MAX, MIN, AVG, COUNT implemented but I could test it just on
> > simple queries such as the one above. Is there any specific case I should
> > be looking at?
> >
> > Regards,
> > Stefano
> >
> > -----Original Message-----
> > From: jincheng sun [mailto:sunjincheng...@gmail.com]
> > Sent: Tuesday, February 28, 2017 12:26 PM
> > To: dev@flink.apache.org
> > Subject: Re: [DISCUSS] Table API / SQL indicators for event and
> processing
> > time
> >
> > Hi everyone, thanks for sharing your thoughts. I really like Timo’s
> > proposal, and I have a few thoughts want to share.
> >
> > We want to keep the query same for batch and streaming. IMO. “process
> time”
> > is something special to dataStream while it is not a well defined term
> for
> > batch query. So it is kind of free to create something new for
> processTime.
> > I think it is a good idea to add a proctime as a reserved keyword for
> SQL.
> >
> >  Regarding to “event time”, it is well defined for batch query. So IMO,
> we
> > should keep the way of defining a streaming window exactly same as batch
> > window. Therefore, the row for event time is nothing special, but just a
> > normal column. The major difference between batch and stream is that in
> > dataStream the event time column must be associated with a watermark
> > function. I really like the way Timo proposed, that we can select any
> > column as rowtime. But I think instead of just clarify a column is a
> > rowtime (actually I do not think we need this special rowtime keyword),
> it
> > is better to register/associate the waterMark function to this column
> when
> > creating the table. For dataStream, we will validate a rowtime column
> only
> > if it has been associated with the waterMark function. A prototype code
> to
> > explain how it looks like is shown as below:
> >
> >   TableAPI:
> >      toTable(tEnv, 'a, 'b, 'c)
> >       .registeredWatermarks('a, waterMarkFunction1)
> >
> >      batchOrStreamTable
> >       .window(Tumble over 5.milli on 'a as 'w)
> >       .groupBy('w, 'b)
> >       .select('b, 'a.count as cnt1, 'c.sum as cnt2)
> >
> >   SQL:
> >     addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c)
> >       .registeredWatermarks('a, waterMarkFunction1)
> >
> >     SELECT a, SUM(b) OVER (PARTITION BY c ORDER BY a RANGE BETWEEN 2
> > PRECEDING) AS sumB FROM MyTable
> >
> > What do you think ?
> >
> > 2017-02-22 23:44 GMT+08:00 Timo Walther <twal...@apache.org>:
> >
> > > Hi everyone,
> > >
> > > I have create an issue [1] to track the progress of this topic. I have
> > > written a little design document [2] how we could implement the
> > > indicators and which parts have to be touched. I would suggest to
> > > implement a prototype, also to see what is possible and can be
> > > integrated both in Flink and Calcite. Feedback is welcome.
> > >
> > > Regards,
> > > Timo
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-5884
> > > [2] https://docs.google.com/document/d/1JRXm09x_wKst6z6UXdCGF9tg
> > > F1ueOAsFiQwahR72vbc/edit?usp=sharing
> > >
> > >
> > >
> > > Am 21/02/17 um 15:06 schrieb Fabian Hueske:
> > >
> > > Hi Xingcan,
> > >>
> > >> thanks for your thoughts.
> > >> In principle you are right that the monotone attribute property would
> > >> be sufficient, however there are more aspects to consider than that.
> > >>
> > >> Flink is a parallel stream processor engine which means that data is
> > >> processed in separate processes and shuffle across them.
> > >> Maintaining a strict order when merging parallel streams would be
> > >> prohibitively expensive.
> > >> Flink's watermark mechanism helps operators to deal with out-of-order
> > >> data (due to out-of-order input or shuffles).
> > >> I don't think we can separate the discussion about time attributes
> > >> from watermarks if we want to use Flink as a processing engine and
> > >> not reimplement large parts from scratch.
> > >>
> > >> When transforming a time attribute, we have to either align it with
> > >> existing watermarks or generate new watermarks.
> > >> If we want to allow all kinds of monotone transformations, we have to
> > >> adapt the watermarks which is not trivial.
> > >> Instead, I think we should initially only allow very few monotone
> > >> transformations which are aligned with the existing watermarks. We
> > >> might later relax this condition if we see that users request this
> > feature.
> > >>
> > >> You are right, that we need to track which attribute can be used as a
> > >> time attribute (i.e., is increasing and guarded by watermarks).
> > >> For that we need to expose the time attribute when a Table is created
> > >> (either when a DataStream is converted like: stream.toTable(tEnv, 'a,
> > >> 'b,
> > >> 't.rowtime) or in a StreamTableSource) and track how it is used in
> > >> queries.
> > >> I am not sure if the monotone property would be the right choice
> > >> here, since data is only quasi-monotone and a monotone annotation
> > >> might trigger some invalid optimizations which change the semantics of
> > a query.
> > >> Right now, Calcite does not offer a quasi-monotone property (at least
> > >> I haven't found it).
> > >>
> > >> Best, Fabian
> > >>
> > >>
> > >> 2017-02-21 4:41 GMT+01:00 Xingcan Cui <xingc...@gmail.com>:
> > >>
> > >> Hi all,
> > >>>
> > >>> As I said in another thread, the main difference between stream and
> > >>> table is that a stream is an ordered list while a table is an
> > unordered set.
> > >>>
> > >>> Without considering the out-of-order problem in practice, whether
> > >>> event-time or processing-time can be just taken as a monotonically
> > >>> increasing field and that's why the given query[1] would work. In
> > >>> other words, we must guarantee the "SELECT MAX(t22.rowtime) ..."
> > >>> subquery returns a single value that can be retrieved from the
> > >>> cached dynamic table since it's dangerous to join two un-windowed
> > >>> streams.
> > >>>
> > >>> Under this circumstance, I just consider adding a "monotonic
> > >>> hint"(INC or
> > >>> DEC) to the field of a (generalized) table (maybe using an
> > >>> annotation on the registerDataXX method) that can be used to
> > >>> indicate whether a field is monotonically increasing or decreasing.
> > >>> Then by taking rowtime as common (monotonically increasing) field,
> > >>> there are several benefits:
> > >>>
> > >>> 1) This can uniform the table and stream by importing total ordering
> > >>> relation to an unordered set.
> > >>>
> > >>> 2) These fields can be modified arbitrarily as long as they keep the
> > >>> declared monotonic feature and the watermark problem does not exist
> > >>> any more.
> > >>>
> > >>> 3) The monotonic hint will be useful in the query optimization
> process.
> > >>>
> > >>> What do you think?
> > >>>
> > >>> Best,
> > >>> Xingcan
> > >>>
> > >>> [1]
> > >>> SELECT​ ​t1.amount​,​ ​t2.rate
> > >>> FROM​ ​
> > >>>    table1 ​AS​ t1,
> > >>> ​ ​ table2 ​AS​ ​t2
> > >>> WHERE ​
> > >>>    t1.currency = t2.currency AND
> > >>>    t2.rowtime ​=​ ​(
> > >>> ​ ​​ ​  SELECT​ ​MAX(t22.rowtime)
> > >>> ​ ​​ ​  FROM​ ​table2 ​AS​ t22
> > >>> ​ ​​   ​AND​ ​t22.rowtime ​<=​ t1.rowtime)
> > >>>
> > >>> On Tue, Feb 21, 2017 at 2:52 AM, Fabian Hueske <fhue...@gmail.com>
> > >>> wrote:
> > >>>
> > >>> Hi everybody,
> > >>>>
> > >>>> When Timo wrote to the Calcite mailing list, Julian Hyde replied
> > >>>> and gave good advice and explained why a system attribute for
> > >>>> event-time would be
> > >>>>
> > >>> a
> > >>>
> > >>>> problem [1].
> > >>>> I thought about this and agree with Julian.
> > >>>>
> > >>>> Here is a document to describe the problem, constraints in Flink
> > >>>> and a proposal how to handle processing time and event time in
> > >>>> Table API and
> > >>>>
> > >>> SQL:
> > >>>
> > >>>> ->
> > >>>> https://docs.google.com/document/d/1MDGViWA_
> > >>>>
> > >>> TCqpaVoWub7u_GY4PMFSbT8TuaNl-
> > >>>
> > >>>> EpbTHQ
> > >>>>
> > >>>> Please have a look, comment and ask questions.
> > >>>>
> > >>>> Thank you,
> > >>>> Fabian
> > >>>>
> > >>>> [1]
> > >>>> https://lists.apache.org/thread.html/6397caf0ca37f97f2cd27d96f7a12c
> > >>>> 6fa845d6fd0870214fdce18d96@%3Cdev.calcite.apache.org%3E
> > >>>>
> > >>>> 2017-02-16 1:18 GMT+01:00 Fabian Hueske <fhue...@gmail.com>:
> > >>>>
> > >>>> Thanks everybody for the comments.
> > >>>>>
> > >>>>> Actually, I think we do not have much choice when deciding whether
> > >>>>> to
> > >>>>>
> > >>>> use
> > >>>
> > >>>> attributes or functions.
> > >>>>> Consider the following join query:
> > >>>>>
> > >>>>> SELECT​ ​t1.amount​,​ ​t2.rate
> > >>>>> FROM​ ​
> > >>>>>    table1 ​AS​ t1,
> > >>>>> ​ ​ table2 ​AS​ ​t2
> > >>>>> WHERE ​
> > >>>>>    t1.currency = t2.currency AND
> > >>>>>    t2.rowtime ​=​ ​(
> > >>>>> ​ ​​ ​  SELECT​ ​MAX(t22.rowtime)
> > >>>>> ​ ​​ ​  FROM​ ​table2 ​AS​ t22
> > >>>>> ​ ​​   ​AND​ ​t22.rowtime ​<=​ t1.rowtime)
> > >>>>>
> > >>>>> The query joins two streaming tables. Table 1 is a streaming table
> > >>>>> with amounts in a certain currency. Table 2 is a (slowly changing)
> > >>>>> streaming table of currency exchange rates.
> > >>>>> We want to join the amounts stream with the exchange rate of the
> > >>>>> corresponding currency that is valid (i.e., last received value ->
> > >>>>> MAX(rowtime)) at the rowtime of the amounts row.
> > >>>>> In order to specify the query, we need to refer to the rowtime of
> > >>>>> the different tables. Hence, we need a way to relate the rowtime
> > >>>>> expression
> > >>>>>
> > >>>> (or
> > >>>>
> > >>>>> marker) to a table.
> > >>>>> This is not possible with a parameterless scalar function.
> > >>>>>
> > >>>>> I'd like to comment on the concerns regarding the performance:
> > >>>>> In fact, the columns could be completely virtual and only exist
> > >>>>> during query parsing and validation.
> > >>>>> During execution, we can directly access the rowtime metadata of a
> > >>>>>
> > >>>> Flink
> > >>>
> > >>>> streaming record (which is present anyway) or look up the current
> > >>>>> processing time from the machine clock. So the processing overhead
> > >>>>>
> > >>>> would
> > >>>
> > >>>> actually be the same as with a marker function.
> > >>>>>
> > >>>>> Regarding the question on what should be allowed with a system
> > >>>>>
> > >>>> attribute:
> > >>>
> > >>>> IMO, it could be used as any other attribute. We need it at least
> > >>>> in
> > >>>>>
> > >>>> GROUP
> > >>>>
> > >>>>> BY, ORDER BY, and WHERE to define windows and joins. We could also
> > >>>>>
> > >>>> allow
> > >>>
> > >>>> to
> > >>>>
> > >>>>> access it in SELECT if we want users to give access to rowtime and
> > >>>>> processing time. So @Haohui, your query could be supported.
> > >>>>> However, what would not be allowed is to modify the value of the
> > >>>>> rows, i.e., by naming another column rowtime, i.e., "SELECT
> > >>>>> sometimestamp AS rowtime" would not be allowed, because Flink does
> > >>>>> not support to modify
> > >>>>>
> > >>>> the
> > >>>>
> > >>>>> event time of a row (for good reasons) and processing time should
> > >>>>> not
> > >>>>>
> > >>>> be
> > >>>
> > >>>> modifiable anyway.
> > >>>>>
> > >>>>> @Timo:
> > >>>>> I think the approach to only use the system columns during parsing
> > >>>>> and validation and converting them to expressions afterwards makes
> > >>>>> a lot of sense.
> > >>>>> The question is how this approach could be nicely integrated with
> > >>>>>
> > >>>> Calcite.
> > >>>>
> > >>>>> Best, Fabian
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>> 2017-02-15 16:50 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>:
> > >>>>>
> > >>>>> Hi,
> > >>>>>>
> > >>>>>> My initial thought would be that it makes more sense to thave
> > >>>>>>
> > >>>>> procTime()
> > >>>
> > >>>> and rowTime() only as functions which in fact are to be used as
> > >>>>>>
> > >>>>> markers.
> > >>>
> > >>>> Having the value (even from special system attributes does not make
> > >>>>>>
> > >>>>> sense
> > >>>>
> > >>>>> in some scenario such as the ones for creating windows, e.g.,
> > >>>>>> If you have SELECT Count(*) OVER (ORDER BY procTime()...) If you
> > >>>>>> get the value of procTime you cannot do anything as you need
> > >>>>>>
> > >>>>> the
> > >>>
> > >>>> marker to know how to construct the window logic.
> > >>>>>>
> > >>>>>> However, your final idea of having " implement some rule/logic
> > >>>>>> that translates the attributes to special RexNodes internally " I
> > >>>>>> believe
> > >>>>>>
> > >>>>> is
> > >>>
> > >>>> good and gives a solution to both problems. One the one hand for
> > >>>> those
> > >>>>>> scenarios where you need the value you can access the value,
> > >>>>>> while for others you can see the special type of the RexNode and
> > >>>>>> use it as a
> > >>>>>>
> > >>>>> marker.
> > >>>>
> > >>>>> Regarding keeping this data in a table...i am not sure as you
> > >>>>> would
> > >>>>>>
> > >>>>> say
> > >>>
> > >>>> we  need to augment the data with two fields whether needed or
> > >>>>>>
> > >>>>> not...this
> > >>>>
> > >>>>> is nto necessary very efficient
> > >>>>>>
> > >>>>>>
> > >>>>>> Dr. Radu Tudoran
> > >>>>>> Senior Research Engineer - Big Data Expert IT R&D Division
> > >>>>>>
> > >>>>>>
> > >>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > >>>>>> European Research Center
> > >>>>>> Riesstrasse 25, 80992 München
> > >>>>>>
> > >>>>>> E-mail: radu.tudo...@huawei.com
> > >>>>>> Mobile: +49 15209084330
> > >>>>>> Telephone: +49 891588344173
> > >>>>>>
> > >>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > >>>>>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> > >>>>>> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB
> 56063,
> > >>>>>> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> > >>>>>> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB
> > 56063,
> > >>>>>> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> > >>>>>> This e-mail and its attachments contain confidential information
> > from
> > >>>>>> HUAWEI, which is intended only for the person or entity whose
> > address
> > >>>>>>
> > >>>>> is
> > >>>
> > >>>> listed above. Any use of the information contained herein in any way
> > >>>>>> (including, but not limited to, total or partial disclosure,
> > >>>>>>
> > >>>>> reproduction,
> > >>>>
> > >>>>> or dissemination) by persons other than the intended recipient(s)
> is
> > >>>>>> prohibited. If you receive this e-mail in error, please notify the
> > >>>>>>
> > >>>>> sender
> > >>>>
> > >>>>> by phone or email immediately and delete it!
> > >>>>>>
> > >>>>>> -----Original Message-----
> > >>>>>> From: Timo Walther [mailto:twal...@apache.org]
> > >>>>>> Sent: Wednesday, February 15, 2017 9:33 AM
> > >>>>>> To: dev@flink.apache.org
> > >>>>>> Subject: Re: [DISCUSS] Table API / SQL indicators for event and
> > >>>>>> processing time
> > >>>>>>
> > >>>>>> Hi all,
> > >>>>>>
> > >>>>>> at first I also thought that built-in functions (rowtime() and
> > >>>>>> proctime()) are the easiest solution. However, I think to be
> > >>>>>>
> > >>>>> future-proof
> > >>>>
> > >>>>> we should make them system attributes; esp. to relate them to a
> > >>>>>> corresponding table in case of multiple tables. Logically they are
> > >>>>>> attributes of each row, which is already done in Table API.
> > >>>>>>
> > >>>>>> I will ask on the Calcite ML if there is a good way for
> integrating
> > >>>>>> system attributes. Right now, I would propose the following
> > >>>>>>
> > >>>>> implementation:
> > >>>>
> > >>>>> - we introduce a custom row type (extending RelDataType)
> > >>>>>> - in a streaming environment every row has two attributes by
> default
> > >>>>>> (rowtime and proctime)
> > >>>>>> - we do not allow creating a row type with those attributes (this
> > >>>>>>
> > >>>>> should
> > >>>
> > >>>> already prevent `SELECT field AS rowtime FROM ...`)
> > >>>>>> - we need to ensure that these attributes are not part of
> expansion
> > >>>>>>
> > >>>>> like
> > >>>
> > >>>> `SELECT * FROM ...`
> > >>>>>> - implement some rule/logic that translates the attributes to
> > special
> > >>>>>> RexNodes internally, such that the opimizer does not modify these
> > >>>>>>
> > >>>>> attributes
> > >>>>
> > >>>>> What do you think?
> > >>>>>>
> > >>>>>> Regards,
> > >>>>>> Timo
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> Am 15/02/17 um 03:36 schrieb Xingcan Cui:
> > >>>>>>
> > >>>>>>> Hi all,
> > >>>>>>>
> > >>>>>>> thanks for this thread.
> > >>>>>>>
> > >>>>>>> @Fabian If I didn't miss the point, the main difference between
> the
> > >>>>>>> two approaches is whether or not taking these time attributes as
> > >>>>>>> common table fields that are directly available to users.
> Whatever,
> > >>>>>>> these time attributes should be attached to records (right?), and
> > >>>>>>>
> > >>>>>> the
> > >>>
> > >>>> discussion lies in whether give them public qualifiers like other
> > >>>>>>> common fields or private qualifiers and related get/set methods.
> > >>>>>>>
> > >>>>>>> The former (system attributes) approach will be more compatible
> > with
> > >>>>>>> existing SQL read-only operations (e.g., select, join), but we
> need
> > >>>>>>>
> > >>>>>> to
> > >>>
> > >>>> add restrictions on SQL modification operation (like what?). I think
> > >>>>>>> there are no needs to forbid users modifying these attributes via
> > >>>>>>> table APIs (like map function). Just inform them about these
> > special
> > >>>>>>> attribute names like system built in aggregator names in
> iteration.
> > >>>>>>>
> > >>>>>>> As for the built in function approach, I don't know if, for now,
> > >>>>>>>
> > >>>>>> there
> > >>>
> > >>>> are functions applied on a single row (maybe the value access
> > >>>>>>> functions like COMPOSITE.get(STRING)?). It seems that most of the
> > >>>>>>> built in functions work for a single field or on columns and thus
> > it
> > >>>>>>> will be mountains of work if we want to add a new kind of
> function
> > >>>>>>>
> > >>>>>> to
> > >>>
> > >>>> SQL. Maybe all existing operations should be modified to support it.
> > >>>>>>>
> > >>>>>>> All in all, if there are existing supports for single row
> function,
> > >>>>>>>
> > >>>>>> I
> > >>>
> > >>>> prefer the built in function approach. Otherwise the system
> > >>>>>>>
> > >>>>>> attributes
> > >>>
> > >>>> approach should be better. After all there are not so much
> > >>>>>>> modification operations in SQL and maybe we can use alias to
> > support
> > >>>>>>> time attributes setting (just hypothesis, not sure if it's
> > >>>>>>>
> > >>>>>> feasible).
> > >>>
> > >>>> @Haohui I think the given query is valid if we add a aggregate
> > >>>>>>> function to (PROCTIME()
> > >>>>>>> - ROWTIME()) / 1000 and it should be executed efficiently.
> > >>>>>>>
> > >>>>>>> Best,
> > >>>>>>> Xingcan
> > >>>>>>>
> > >>>>>>> On Wed, Feb 15, 2017 at 6:17 AM, Haohui Mai <ricet...@gmail.com>
> > >>>>>>>
> > >>>>>> wrote:
> > >>>>
> > >>>>> Hi,
> > >>>>>>>>
> > >>>>>>>> Thanks for starting the discussion. I can see there are multiple
> > >>>>>>>> trade-offs in these two approaches. One question I have is that
> to
> > >>>>>>>> which extent Flink wants to open its APIs to allow users to
> access
> > >>>>>>>> both processing and event time.
> > >>>>>>>>
> > >>>>>>>> Before we talk about joins, my understanding for the two
> > approaches
> > >>>>>>>> that you mentioned are essentially (1) treating the value of
> event
> > >>>>>>>>
> > >>>>>>> /
> > >>>
> > >>>> processing time as first-class fields for each row, (2) limiting
> > >>>>>>>>
> > >>>>>>> the
> > >>>
> > >>>> scope of time indicators to only specifying windows. Take the
> > >>>>>>>> following query as an
> > >>>>>>>> example:
> > >>>>>>>>
> > >>>>>>>> SELECT (PROCTIME() - ROWTIME()) / 1000 AS latency FROM table
> GROUP
> > >>>>>>>>
> > >>>>>>> BY
> > >>>
> > >>>> FLOOR(PROCTIME() TO MINUTES)
> > >>>>>>>>
> > >>>>>>>> There are several questions we can ask:
> > >>>>>>>>
> > >>>>>>>> (1) Is it a valid query?
> > >>>>>>>> (2) How efficient the query will be?
> > >>>>>>>>
> > >>>>>>>> For this query I can see arguments from both sides. I think at
> the
> > >>>>>>>> end of the day it really comes down to what Flink wants to
> > support.
> > >>>>>>>> After working on FLINK-5624 I'm more inclined to support the
> > second
> > >>>>>>>> approach (i.e., built-in functions). The main reason why is that
> > >>>>>>>>
> > >>>>>>> the
> > >>>
> > >>>> APIs of Flink are designed to separate times from the real
> > >>>>>>>>
> > >>>>>>> payloads.
> > >>>
> > >>>> It probably makes sense for the Table / SQL APIs to have the same
> > >>>>>>>>
> > >>>>>>> designs.
> > >>>>>>
> > >>>>>>> For joins I don't have a clear answer on top of my head. Flink
> > >>>>>>>> requires two streams to be put in the same window before doing
> the
> > >>>>>>>> joins. This is essentially a subset of what SQL can express. I
> > >>>>>>>>
> > >>>>>>> don't
> > >>>
> > >>>> know what would be the best approach here.
> > >>>>>>>>
> > >>>>>>>> Regards,
> > >>>>>>>> Haohui
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> On Tue, Feb 14, 2017 at 12:26 AM Fabian Hueske <
> fhue...@gmail.com
> > >
> > >>>>>>>>
> > >>>>>>> wrote:
> > >>>>>>
> > >>>>>>> Hi,
> > >>>>>>>>>
> > >>>>>>>>> It would as in the query I gave as an example before:
> > >>>>>>>>>
> > >>>>>>>>> SELECT
> > >>>>>>>>>     a,
> > >>>>>>>>>     SUM(b) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN
> 2
> > >>>>>>>>> PRECEDING AND CURRENT ROW) AS sumB, FROM myStream
> > >>>>>>>>>
> > >>>>>>>>> Here "proctime" would be a system attribute of the table
> > >>>>>>>>>
> > >>>>>>>> "myStream".
> > >>>
> > >>>> The table would also have another system attribute called
> > >>>>>>>>>
> > >>>>>>>> "rowtime"
> > >>>
> > >>>> which would be used to indicate event time semantics.
> > >>>>>>>>> These attributes would always be present in tables which are
> > >>>>>>>>>
> > >>>>>>>> derived
> > >>>
> > >>>> from streams.
> > >>>>>>>>> Because we still require that streams have timestamps and
> > >>>>>>>>>
> > >>>>>>>> watermarks
> > >>>
> > >>>> assigned (either by the StreamTableSource or the somewhere
> > >>>>>>>>> downstream the DataStream program) when they are converted
> into a
> > >>>>>>>>> table, there is no
> > >>>>>>>>>
> > >>>>>>>> need
> > >>>>>>>>
> > >>>>>>>>> to register anything.
> > >>>>>>>>>
> > >>>>>>>>> Does that answer your questions?
> > >>>>>>>>>
> > >>>>>>>>> Best, Fabian
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> 2017-02-14 2:04 GMT+01:00 Radu Tudoran <
> radu.tudo...@huawei.com
> > >:
> > >>>>>>>>>
> > >>>>>>>>> Hi Fabian,
> > >>>>>>>>>>
> > >>>>>>>>>> Thanks for starting the discussion. Before I give my thoughts
> on
> > >>>>>>>>>> this
> > >>>>>>>>>>
> > >>>>>>>>> can
> > >>>>>>>>
> > >>>>>>>>> you please give some examples of how would you see option of
> > >>>>>>>>>>
> > >>>>>>>>> using
> > >>>
> > >>>> "system
> > >>>>>>>>>
> > >>>>>>>>>> attributes"?
> > >>>>>>>>>> Do you use this when you register the stream as a table, do
> you
> > >>>>>>>>>>
> > >>>>>>>>> use
> > >>>
> > >>>> if when you call an SQL query, do you use it when you translate
> > >>>>>>>>>> back a
> > >>>>>>>>>>
> > >>>>>>>>> table
> > >>>>>>>>
> > >>>>>>>>> to a stream / write it to a dynamic table?
> > >>>>>>>>>>
> > >>>>>>>>>> Dr. Radu Tudoran
> > >>>>>>>>>> Senior Research Engineer - Big Data Expert IT R&D Division
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > >>>>>>>>>> European Research Center
> > >>>>>>>>>> Riesstrasse 25, 80992 München
> > >>>>>>>>>>
> > >>>>>>>>>> E-mail: radu.tudo...@huawei.com
> > >>>>>>>>>> Mobile: +49 15209084330 <+49%201520%209084330>
> > >>>>>>>>>> Telephone: +49 891588344173 <+49%2089%201588344173>
> > >>>>>>>>>>
> > >>>>>>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > >>>>>>>>>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> > >>>>>>>>>> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB
> > >>>>>>>>>>
> > >>>>>>>>> 56063,
> > >>>>
> > >>>>> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> > >>>>>>>>>> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB
> > >>>>>>>>>>
> > >>>>>>>>> 56063,
> > >>>>
> > >>>>> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> > >>>>>>>>>> This e-mail and its attachments contain confidential
> information
> > >>>>>>>>>>
> > >>>>>>>>> from
> > >>>>
> > >>>>> HUAWEI, which is intended only for the person or entity whose
> > >>>>>>>>>>
> > >>>>>>>>> address
> > >>>>
> > >>>>> is
> > >>>>>>>>
> > >>>>>>>>> listed above. Any use of the information contained herein in
> any
> > >>>>>>>>>>
> > >>>>>>>>> way
> > >>>>
> > >>>>> (including, but not limited to, total or partial disclosure,
> > >>>>>>>>>>
> > >>>>>>>>> reproduction,
> > >>>>>>>>>
> > >>>>>>>>>> or dissemination) by persons other than the intended
> > recipient(s)
> > >>>>>>>>>>
> > >>>>>>>>> is
> > >>>>
> > >>>>> prohibited. If you receive this e-mail in error, please notify
> > >>>>>>>>>>
> > >>>>>>>>> the
> > >>>
> > >>>> sender
> > >>>>>>>>
> > >>>>>>>>> by phone or email immediately and delete it!
> > >>>>>>>>>>
> > >>>>>>>>>> -----Original Message-----
> > >>>>>>>>>> From: Fabian Hueske [mailto:fhue...@gmail.com]
> > >>>>>>>>>> Sent: Tuesday, February 14, 2017 1:01 AM
> > >>>>>>>>>> To: dev@flink.apache.org
> > >>>>>>>>>> Subject: [DISCUSS] Table API / SQL indicators for event and
> > >>>>>>>>>>
> > >>>>>>>>> processing
> > >>>>>>
> > >>>>>>> time
> > >>>>>>>>>
> > >>>>>>>>>> Hi,
> > >>>>>>>>>>
> > >>>>>>>>>> I'd like to start an discussion about how Table API / SQL
> > queries
> > >>>>>>>>>>
> > >>>>>>>>> indicate
> > >>>>>>>>>
> > >>>>>>>>>> whether an operation is done in event or processing time.
> > >>>>>>>>>>
> > >>>>>>>>>> 1) Why do we need to indicate the time mode?
> > >>>>>>>>>>
> > >>>>>>>>>> We need to distinguish event time and processing time mode for
> > >>>>>>>>>>
> > >>>>>>>>> operations
> > >>>>>>>>
> > >>>>>>>>> in queries in order to have the semantics of a query fully
> > >>>>>>>>>>
> > >>>>>>>>> defined.
> > >>>
> > >>>> This cannot be globally done in the TableEnvironment because some
> > >>>>>>>>>>
> > >>>>>>>>> queries
> > >>>>>>>>
> > >>>>>>>>> explicitly request an expression such as the ORDER BY clause of
> > >>>>>>>>>>
> > >>>>>>>>> an
> > >>>
> > >>>> OVER
> > >>>>>>
> > >>>>>>> window with PRECEDING / FOLLOWING clauses.
> > >>>>>>>>>> So we need a way to specify something like the following
> query:
> > >>>>>>>>>>
> > >>>>>>>>>> SELECT
> > >>>>>>>>>>     a,
> > >>>>>>>>>>     SUM(b) OVER (PARTITION BY c ORDER BY proctime ROWS
> BETWEEN 2
> > >>>>>>>>>>
> > >>>>>>>>> PRECEDING
> > >>>>>>>>
> > >>>>>>>>> AND CURRENT ROW) AS sumB, FROM myStream
> > >>>>>>>>>>
> > >>>>>>>>>> where "proctime" indicates processing time. Equivalently
> > >>>>>>>>>>
> > >>>>>>>>> "rowtime"
> > >>>
> > >>>> would
> > >>>>>>>>
> > >>>>>>>>> indicate event time.
> > >>>>>>>>>>
> > >>>>>>>>>> 2) Current state
> > >>>>>>>>>>
> > >>>>>>>>>> The current master branch implements time support only for
> > >>>>>>>>>>
> > >>>>>>>>> grouping
> > >>>
> > >>>> windows in the Table API.
> > >>>>>>>>>> Internally, the Table API converts a 'rowtime symbol (which
> > looks
> > >>>>>>>>>>
> > >>>>>>>>> like
> > >>>>>>
> > >>>>>>> a
> > >>>>>>>>
> > >>>>>>>>> regular attribute) into a special expression which indicates
> > >>>>>>>>>>
> > >>>>>>>>> event-time.
> > >>>>>>>>
> > >>>>>>>>> For example:
> > >>>>>>>>>>
> > >>>>>>>>>> table
> > >>>>>>>>>>     .window(Tumble over 5.milli on 'rowtime as 'w)
> > >>>>>>>>>>     .groupBy('a, 'w)
> > >>>>>>>>>>     .select(...)
> > >>>>>>>>>>
> > >>>>>>>>>> defines a tumbling event-time window.
> > >>>>>>>>>>
> > >>>>>>>>>> Processing-time is indicated by omitting a time attribute
> > >>>>>>>>>> (table.window(Tumble over 5.milli as 'w) ).
> > >>>>>>>>>>
> > >>>>>>>>>> 3) How can we do that in SQL?
> > >>>>>>>>>>
> > >>>>>>>>>> In SQL we cannot add special expressions without touching the
> > >>>>>>>>>>
> > >>>>>>>>> parser
> > >>>>
> > >>>>> which
> > >>>>>>>>>
> > >>>>>>>>>> we don't want to do because we want to stick to the SQL
> > standard.
> > >>>>>>>>>> Therefore, I see only two options: adding system attributes or
> > >>>>>>>>>> (parameterless) built-in functions. I list some pros and cons
> of
> > >>>>>>>>>>
> > >>>>>>>>> the
> > >>>>
> > >>>>> approaches below:
> > >>>>>>>>>>
> > >>>>>>>>>> 1. System Attributes:
> > >>>>>>>>>> + most natural way to access a property of a record.
> > >>>>>>>>>> + works with joins, because time attributes can be related to
> > >>>>>>>>>>
> > >>>>>>>>> tables
> > >>>>
> > >>>>> - We need to ensure the attributes are not writable and always
> > >>>>>>>>>>
> > >>>>>>>>> present
> > >>>>>>
> > >>>>>>> in
> > >>>>>>>>
> > >>>>>>>>> streaming tables (i.e., they should be system defined
> > >>>>>>>>>>
> > >>>>>>>>> attributes).
> > >>>
> > >>>> - Need to adapt existing Table API expressions (will not change
> > >>>>>>>>>>
> > >>>>>>>>> the
> > >>>
> > >>>> API
> > >>>>>>
> > >>>>>>> but some parts of the internal translation)
> > >>>>>>>>>> - Event time value must be set when the stream is converted,
> > >>>>>>>>>>
> > >>>>>>>>> processing
> > >>>>>>
> > >>>>>>> time is evaluated on the fly
> > >>>>>>>>>>
> > >>>>>>>>>> 2. Built-in Functions
> > >>>>>>>>>> + Users could try to modify time attributes which is not
> > possible
> > >>>>>>>>>>
> > >>>>>>>>> with
> > >>>>>>
> > >>>>>>> functions
> > >>>>>>>>>> - do not work with joins, because we need to address different
> > >>>>>>>>>>
> > >>>>>>>>> relations
> > >>>>>>>>
> > >>>>>>>>> - not a natural way to access a property of a record
> > >>>>>>>>>>
> > >>>>>>>>>> I think the only viable choice are system attributes, because
> > >>>>>>>>>>
> > >>>>>>>>> built-in
> > >>>>>>
> > >>>>>>> functions cannot be used for joins.
> > >>>>>>>>>> However, system attributes are the more complex solution
> because
> > >>>>>>>>>>
> > >>>>>>>>> they
> > >>>>
> > >>>>> need
> > >>>>>>>>>
> > >>>>>>>>>> a better integration with Calcite's SQL validator (preventing
> > >>>>>>>>>>
> > >>>>>>>>> user
> > >>>
> > >>>> attributes which are named rowtime for instance).
> > >>>>>>>>>>
> > >>>>>>>>>> Since there are currently a several contributions on the way
> > >>>>>>>>>>
> > >>>>>>>>> (such
> > >>>
> > >>>> as
> > >>>>
> > >>>>> SQL
> > >>>>>>>>
> > >>>>>>>>> OVER windows FLINK-5653 to FLINK-5658) that need time
> indicators,
> > >>>>>>>>>>
> > >>>>>>>>> we
> > >>>>
> > >>>>> need a
> > >>>>>>>>>
> > >>>>>>>>>> solution soon to be able to make progress.
> > >>>>>>>>>> There are two PRs, #3252 and #3271, which implement the
> built-in
> > >>>>>>>>>>
> > >>>>>>>>> marker
> > >>>>>>
> > >>>>>>> functions proctime() and rowtime() and which could serve as a
> > >>>>>>>>>>
> > >>>>>>>>> temporary
> > >>>>>>
> > >>>>>>> solution (since we do not work on joins yet).
> > >>>>>>>>>> I would like to suggest to use these functions as a starting
> > >>>>>>>>>>
> > >>>>>>>>> point
> > >>>
> > >>>> (once
> > >>>>>>>>
> > >>>>>>>>> the PRs are merged) and later change to the system attribute
> > >>>>>>>>>>
> > >>>>>>>>> solution
> > >>>>
> > >>>>> which
> > >>>>>>>>>
> > >>>>>>>>>> needs a bit more time to be implemented.
> > >>>>>>>>>>
> > >>>>>>>>>> I talked with Timo today about this issue and he said he would
> > >>>>>>>>>>
> > >>>>>>>>> like
> > >>>
> > >>>> to
> > >>>>>>
> > >>>>>>> investigate how we can implement this as system functions
> > >>>>>>>>>>
> > >>>>>>>>> properly
> > >>>
> > >>>> integrated with Calcite and the SQL Validator.
> > >>>>>>>>>>
> > >>>>>>>>>> What do others think?
> > >>>>>>>>>>
> > >>>>>>>>>> Best, Fabian
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>
> > >
> >
>

Reply via email to