Hi Fabian, I got a similar question with Jark. Theoretically, the row times of two streams could be quite difference, e.g., one for today and the other one for yesterday. How can we align them?
Best, Xingcan On Mon, Jul 31, 2017 at 9:04 PM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi Jark, > > yes, the handling of watermarks is very tricky. It is not directly related > to the proposal which is only about the representation of timestamps but > becomes important for event-time joins. > We have a JIRA about an operator that is able to hold back watermarks [1]. > > Roughly the idea is to track the smallest timestamp that will be emitted in > the future and align the watermark to this timestamp. > For this we need to know the semantics of the operator (which timestamp > will be emitted in the future) but this will be given for relational > operators. > The new operator could emit a watermark whenever it received one. > > In case of a join, the smallest future timestamp depends on two fields and > not just on one. > > Best, > Fabian > > [1] https://issues.apache.org/jira/browse/FLINK-7245 > > > 2017-07-31 14:35 GMT+02:00 Jark Wu <j...@apache.org>: > > > Hi, > > > > @Fabian, I read your proposal carefully again, and I'm big +1 to do it. > The > > proposal can address the problem of that how to forward both input > tables' > > rowtime of dual stream join (windowed/non-windowed). The additional > > payload drawback > > is acceptable. > > > > You mentioned that: > > > > > The query operators ensure that the watermarks are always behind all > > > event-time timestamps. With additional analysis we will be able to > > restrict > > > this to timestamps that are actually used as such. > > > > I'm more curious about how can we define the watermark strategies in > order > > to make sure all timestamp columns are aligned to watermarks. Especially, > > when the watermark has been defined in the input DataStream. > > > > Bests, > > Jark Wu > > > > > > 2017-07-27 23:13 GMT+08:00 Xingcan Cui <xingc...@gmail.com>: > > > > > Hi all, > > > > > > Thanks for the answers, @Fabian. > > > > > > @Jark, at first I also wanted the users to reassign the timestamp field > > > arbitrarily. However, that means we have to break the current "time > > system" > > > and create a new one. The blocked watermarks become meaningless and > > maybe a > > > new WatermarkAssigner should be provided. A little more strict > mechanism > > > would be only allowing to use the existing timestamp fields. It sounds > > > reasonable, but will bring an unnecessary barrier to stream/batch SQL, > > i.e. > > > some SQL works for the batch can not be executed in the stream > > environment. > > > I just wonder if we could automatically choose a field, which will be > > used > > > in the following calculations. Not sure if it makes sense. > > > > > > @Shaoxuan @Radu, I totally agree that the "proctime" is the main block > > for > > > consolidating stream/batch SQL. Though from a general point of view, it > > can > > > indicate the time to some extent, the randomness property determines > that > > > it should never be used in time-sensitive applications. I always > believe > > in > > > that all the information used for query evaluation should be acquired > > from > > > data itself. > > > > > > Best, > > > Xingcan > > > > > > On Thu, Jul 27, 2017 at 7:24 PM, Fabian Hueske <fhue...@gmail.com> > > wrote: > > > > > > > Hi Shaoxuan, > > > > > > > > thanks for your comments. I agree with your comment: > > > > > > > > > The problem we used to have is that we have treated eventtime > column > > > as a > > > > special timestamp column. > > > > > > > > IMO, an event-time timestamp column is a regular column that is > aligned > > > > with the watermarks of the stream. > > > > In order to distinguish watermark aligned columns from others, we > need > > a > > > > special flag in the schema. > > > > When a timestamp column is modified and we cannot guarantee that is > it > > > > still aligned with the watermarks, it must lose the special flag and > be > > > > treated like any other column. > > > > > > > > Regarding your comments: > > > > 1) I agree, that we can use Long in addition to Timestamp as a > > timestamp > > > > columns. Since timestamp columns need to be comparable to watermarks > > > which > > > > are Longs, I don't see that other types would make sense. For now, I > > > would > > > > keep the restriction that timestamps can only be of Timestamp type. I > > > > think, extending this to Long would be a follow-up issue to the > > changes I > > > > proposed here. > > > > 2) Relates to 1) and I agree. if we use a Long attribute as timestamp > > it > > > > should remain of type Long. For now I would keep converting it to > > > Timestamp > > > > and change that later. > > > > 3) Yes, timestamp columns must be aligned to watermarks. That's their > > > > primary characteristic. How to define watermark strategies is > > orthogonal > > > to > > > > this discussion, IMO. > > > > 4) From my point of view, proc-time is a purely virtual column and > not > > > > related to an actual (data) column. However, it must be part of the > > > schema > > > > and treated like any other attribute for a good user experience and > SQL > > > > compliance. In order to be able to join two tables on processing > time, > > it > > > > must be possible to include a processing time column in the schema > > > > definition of the table. Processing time queries can never compute > the > > > same > > > > results as batch queries but their semantics should be aligned with > > > > event-time queries. > > > > > > > > Best, Fabian > > > > > > > > 2017-07-27 9:47 GMT+02:00 Radu Tudoran <radu.tudo...@huawei.com>: > > > > > > > > > Hi all, > > > > > > > > > > @Shaoxuan - thanks for the remarks. I have a question regarding > your > > > > > suggestion not to consider to create proctime window in a regular > > > > column. I > > > > > think this would be useful though. First you might need to carry > the > > > > > timestamp indicator of when the processing happened (for log > > purposes, > > > > > provenance, traceability ...). Secondly - I do not think it is > > > > > contradicting with the semantics in batch SQL as in SQL you have > the > > > > > function "now()" ...which pretty much carry the same semantics as > > > having > > > > a > > > > > function to mark the proctime and then projecting this into a > column. > > > If > > > > I > > > > > am not mistaken you can introduce in database columns the result of > > > > calling > > > > > now(). > > > > > > > > > > > > > > > Dr. Radu Tudoran > > > > > Staff Research Engineer - Big Data Expert > > > > > IT R&D Division > > > > > > > > > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > > > > > German Research Center > > > > > Munich Office > > > > > Riesstrasse 25, 80992 München > > > > > > > > > > E-mail: radu.tudo...@huawei.com > > > > > Mobile: +49 15209084330 > > > > > Telephone: +49 891588344173 > > > > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > > > > > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com > > > > > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB > 56063, > > > > > Managing Director: Bo PENG, Qiuen Peng, Shengli Wang > > > > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB > 56063, > > > > > Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang > > > > > This e-mail and its attachments contain confidential information > from > > > > > HUAWEI, which is intended only for the person or entity whose > address > > > is > > > > > listed above. Any use of the information contained herein in any > way > > > > > (including, but not limited to, total or partial disclosure, > > > > reproduction, > > > > > or dissemination) by persons other than the intended recipient(s) > is > > > > > prohibited. If you receive this e-mail in error, please notify the > > > sender > > > > > by phone or email immediately and delete it! > > > > > > > > > > > > > > > -----Original Message----- > > > > > From: Shaoxuan Wang [mailto:shaox...@apache.org] > > > > > Sent: Thursday, July 27, 2017 6:00 AM > > > > > To: Dev > > > > > Subject: Re: [DISCUSS] Table API / SQL internal timestamp handling > > > > > > > > > > Hi Everyone, > > > > > I like this proposal. The problem we used to have is that we have > > > treated > > > > > eventtime column as a special timestamp column. An eventtime column > > is > > > > > nothing special than all other regular columns, but with a certain > > flag > > > > > (eventtime-indicator) inferring that this column can be used as an > > > > eventime > > > > > to decide when a bounded query can emit the final result by > comparing > > > > with > > > > > a concern associated waterMark. > > > > > > > > > > I have a few comments adding on top of this (they may have already > > been > > > > > addressed in the conversation — since It’s a long discussion, I may > > > miss > > > > > something): > > > > > > > > > > 1. While we remove timestamp column, we introduce > > > eventtime-indicator > > > > > (we may already have this concept), it is only a flag can be > > applied > > > > for > > > > > any column (note that some types may not be able to be used as > > > > eventtime > > > > > column), indicating if this column can be used as eventtime or > > not. > > > > This > > > > > flag is useful for validation and codeGen. > > > > > 2. A column that has been used as an eventtime, should not lose > > its > > > > own > > > > > type. We should not cast all eventime column to the timestamp > > type. > > > > For > > > > > instance, if a column is a long type, it will keep as long type > > even > > > > if > > > > > a > > > > > window aggregate has used it as a eventtime. > > > > > 3. Eventtime will only work well with some associated waterMark > > > > > strategy. We may consider forcing user to provide a waterMark > > logic > > > on > > > > > his/her selected eventtime. > > > > > 4. For proctime, I hope we should not introduce > proctime-indicator > > > for > > > > > regular column. Ideally we should not allow user to create > > proctime > > > > > window > > > > > on regular column, as this is against the batch query semantics. > > > > > Therefore > > > > > I suggest we should always introduce a proctime timestamp column > > for > > > > > users > > > > > to create proctime window. And unlike eventtime, proctime does > not > > > > need > > > > > any > > > > > associated waterMark strategy, as there is no such out of order > > > issue > > > > > for > > > > > the proctime. > > > > > > > > > > Regards, > > > > > Shaoxuan > > > > > > > > > > On Wed, Jul 26, 2017 at 9:10 PM, Fabian Hueske <fhue...@gmail.com> > > > > wrote: > > > > > > > > > > > Thanks everybody for the replies so far. > > > > > > > > > > > > Let me answer your questions and reply to your thoughts: > > > > > > > > > > > > Radu: > > > > > > --- > > > > > > First of all, although my proposal is movivated by a join > operator, > > > > > > this discussion is about timestamp handling, not about joins in > > > > general. > > > > > > > > > > > > - The semantics of outer joins is to emit null and there is no > way > > > > > > around that. This is not an issue for us. Actually, outer joins > are > > > > > > supported by the batch SQL / Table API. It is true that outer > joins > > > > > > might result in null timestamps. Calcite will mark those fields > as > > > > > > nullable and we should check that timestamps which are used in > > > windows > > > > > or joins are not nullable. > > > > > > - The query has to explicitly specify which timestamp attribute > to > > > use. > > > > > > Otherwise its semantics are not complete and it is invalid. A > > > > > > group-window that follows a join will reference a timestamp > > attribute > > > > > > and this will be used. The other timestamp might be projected > out. > > > > > > When a result with two timestamps is converted into a DataStream, > > the > > > > > > user has to decide. This could be done inside of the Table to > > > > > > DataStream conversion. If the Table has more than one valid > > > timestamp, > > > > > > the conversion will ask which timestamp to forward. > > > > > > - A proctime join should forward all proctime attributes of the > > input > > > > > > tables. All will be the same, but that does not matter because > they > > > > > > are either virtual or represented as 1 byte dummy attributes. > Also, > > > > > > unused ones will be automatically projected out anyway. > > > > > > - An event-time join should forward all event-time attributes of > > the > > > > > > input tables. Creating a new event-time attribute using > processing > > > > > > time makes event-time processing pointless and will give > completely > > > > > random results. > > > > > > Event-time is not about the "time an event is created" but about > a > > > > > > timestamp that is associated with an event. For example an order > > > event > > > > > > could have three timestamps: "orderTime", "shipTime", and > > > > "receiveTime". > > > > > > Each could be a valid event-time attribute. > > > > > > > > > > > > Jark: > > > > > > --- > > > > > > Thanks for the proposal. I think I understand what you want to > > > achieve > > > > > > with this, but I think functions to instantiate time attributes > are > > > > > > not necessary and would make things more complicated. The point > of > > > > > > supporting multiple time attributes is to ensure that all of them > > are > > > > > > aligned with the watermarks. If we add a method > ROW_TIME(timestamp) > > > > > > and we don't know if the timestamp is aligned with the > watermarks. > > If > > > > > > that is not the case, the query won't be executed as expected. > The > > > > > > issue of LEFT JOIN can easily be addressed by checking for > > > > > > nullablility during optimization when an operator tries to use > it. > > > > > > > > > > > > The beauty of supporting multiple timestamps is that a user does > > not > > > > > > have to care at all about timestamps (or timestamp functions) and > > > > > > watermarks. As long as the query uses a timestamp attribute that > > was > > > > > > originally declared as rowtime in a source table (and was not > > > modified > > > > > > afterwards), this is fine. Think of a cascade of three windowed > > > joins: > > > > > > R - S - T - U, and you want to join S - T first. In that case, > you > > > > > > need to preserve the timestamps of S and T in order to join R and > > U. > > > > > > From a relational algebra point of view, there is no reason to > > have a > > > > > > limitation on how these attributes are accessed. Timestamps are > > just > > > > > > regular fields of a record. The only restriction in the context > of > > > > > > stream processing is that the watermark must be aligned with > > > > > > timestamps, i.e., follow all timestamps such that data is not > late > > > > > > according to any of the timestamps. This we can achieve and > handle > > > > > internally without the user having to worry about it. > > > > > > > > > > > > Xingcan: > > > > > > --- > > > > > > I think your questions are mostly implementation details and not > so > > > > > > much related to the original proposal of supporting multiple > > > > timestamps. > > > > > > > > > > > > My take on your questions is: > > > > > > 1. The rate at which watermarks are emitted is not important for > > the > > > > > > correctness of a query. However, it can affect the performance, > > > > > > because each watermark is sent as a special record and it is > > > > > > broadcasted. My initial take would be to emit a new watermark > > > whenever > > > > > > the operator updated its watermark because usually, the operator > > > would > > > > > > have forwarded the old watermark. > > > > > > 2. I would say this is the responsibility of the operator because > > > > > > first it is not related to the semantics of the query and second > it > > > is > > > > > > an operator responsibility in the existing code as well. > > > > > > > > > > > > Jark 2: > > > > > > You are right, the query (or user) must decide on the event-time > > > > > > attribute to use. My main point is, it is much easier for the > user > > > > > > (and for us > > > > > > internally) if we internally track multiple timestamps. Because > we > > do > > > > > > not have to prune the timestamp that will not be later used into > > the > > > > > join. > > > > > > Moreover, both timestamps might be used later (see join example, > > > which > > > > > > could be reordered of course). All we have to do is to ensure > that > > > all > > > > > > timestamps are aligned with the watermarks. > > > > > > > > > > > > Radu 2: > > > > > > IMO, time (or anything else that affects the semantics) should > > never > > > > > > be decided by the system. When we would do that, a query is not > > fully > > > > > > specified or, even worse, the way it is executed is semantically > > > > > > incorrect and produces arbitrary results. > > > > > > > > > > > > Time attributes should be specified in the source tables and then > > > > > > forwarded from there. So far I haven't seen an example where this > > > > > > would not be possible (within the semantics or relational > queries). > > > If > > > > > > we do that right, there won't be a need for explicit time > > management > > > > > > except for the definition of the initial timestamps which can be > > > > > > hidden in the table definition. As I said before, we (or the > > system) > > > > > > cannot decide on the timestamp because that would lead to > arbitrary > > > > > > results. Asking the user to do that would mean explicit time > > > > > > management which is also not desirable. I think my proposal gives > > > > > > users all options (timestamps) to chose from and the system can > do > > > the > > > > > rest. > > > > > > > > > > > > Best, Fabian > > > > > > > > > > > > 2017-07-26 10:46 GMT+02:00 Radu Tudoran <radu.tudo...@huawei.com > >: > > > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > > > I just want to add that I was referring to NULL values not > > > > > > > specifically > > > > > > to > > > > > > > timefields but to the event itself. If you have the follow > > > situation > > > > > > > > > > > > > > Stream 1: .... | event1 | .... > > > > > > > Stream 2: .... | | .... > > > > > > > > > > > > > > And you have a LEFT JOIN between stream 1 and stream 2 (no > > > > > > > condition)...then you still need to emit (event1,null) ... as > > this > > > > > > > is the behavior of left join. This is maybe a very simple > > > situation, > > > > > > > but the > > > > > > point > > > > > > > is that left joins and right joins can have situation when you > > have > > > > > > > elements only in the main stream and no element in the right > > > stream. > > > > > > > And for this case you still need to emit. > > > > > > > > > > > > > > > > > > > > > Regarding whether time should be decided by system or not...i > > think > > > > > > > the answer is it depends. I think the example from Jack is very > > > good > > > > > > > and > > > > > > shows > > > > > > > the need for some mechanisms to select/manage the time (I like > > the > > > > > > proposal > > > > > > > of having functions to insert the time in the output!). > However, > > if > > > > > > > a business analyst would write a query without explicit time > > > > > > > management we still need to have some default behavior in the > > > > > > > system. As per my initial proposal, I think we need to decide > on > > > > > > > one timestamp field to carry (either a new one at the moment of > > the > > > > > > > join) or the timestamp from the > > > > > > main > > > > > > > stream (...although I am not sure which one is the main stream > > in > > > > > > > the > > > > > > case > > > > > > > of a full join:) ) > > > > > > > > > > > > > > > > > > > > > Dr. Radu Tudoran > > > > > > > Staff Research Engineer - Big Data Expert IT R&D Division > > > > > > > > > > > > > > > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > > > > > > > German Research Center > > > > > > > Munich Office > > > > > > > Riesstrasse 25, 80992 München > > > > > > > > > > > > > > E-mail: radu.tudo...@huawei.com > > > > > > > Mobile: +49 15209084330 > > > > > > > Telephone: +49 891588344173 > > > > > > > > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > > > > > > > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com > > > > > > > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB > > > 56063, > > > > > > > Managing Director: Bo PENG, Qiuen Peng, Shengli Wang > > > > > > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB > > > 56063, > > > > > > > Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang > > > > > > > This e-mail and its attachments contain confidential > information > > > from > > > > > > > HUAWEI, which is intended only for the person or entity whose > > > address > > > > > is > > > > > > > listed above. Any use of the information contained herein in > any > > > way > > > > > > > (including, but not limited to, total or partial disclosure, > > > > > > reproduction, > > > > > > > or dissemination) by persons other than the intended > recipient(s) > > > is > > > > > > > prohibited. If you receive this e-mail in error, please notify > > the > > > > > sender > > > > > > > by phone or email immediately and delete it! > > > > > > > > > > > > > > -----Original Message----- > > > > > > > From: Jark Wu [mailto:j...@apache.org] > > > > > > > Sent: Wednesday, July 26, 2017 8:29 AM > > > > > > > To: dev@flink.apache.org > > > > > > > Subject: Re: [DISCUSS] Table API / SQL internal timestamp > > handling > > > > > > > > > > > > > > Hi Xingcan, > > > > > > > > > > > > > > IMO, I don't think event-time of join results could be > > > automatically > > > > > > > decided by system. Considering batch tables, if users want a > > event > > > > time > > > > > > > window aggregation after join, user must specify the time field > > > > > > explicitly > > > > > > > (T1.rowtime or T2.rowtime or the computed result of them). So > in > > > the > > > > > case > > > > > > > of streaming tables, the system also can't automatically decide > > the > > > > > time > > > > > > > field for users. > > > > > > > > > > > > > > In regards to the question you asked, I think we don't need to > > > change > > > > > the > > > > > > > watermark no matter we choose the left rowtime or right rowtime > > or > > > > the > > > > > > > combination. Because the watermark has been aligned with the > > > rowtime > > > > in > > > > > > the > > > > > > > source. Maybe I'm wrong about this, please correct me if I'm > > > missing > > > > > > > something. > > > > > > > > > > > > > > What do you think? > > > > > > > > > > > > > > Regards, > > > > > > > Jark > > > > > > > > > > > > > > 2017-07-26 11:24 GMT+08:00 Xingcan Cui <xingc...@gmail.com>: > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > @Fabian, thanks for raising this. > > > > > > > > > > > > > > > > @Radu and Jark, personally I think the timestamp field is > > > critical > > > > > for > > > > > > > > query processing and thus should be declared as (or supposed > to > > > be) > > > > > > > > NOT NULL. In addition, I think the event-time semantic of the > > > join > > > > > > > > results should be automatically decided by the system, i.e., > we > > > do > > > > > not > > > > > > > > hand it over to users so to avoid some unpredictable > > assignment. > > > > > > > > > > > > > > > > Generally speaking, consolidating different time fields is > > > possible > > > > > > > > since all of them should ideally be monotonically increasing. > > > From > > > > my > > > > > > > > point of view, the problem lies in > > > > > > > > (1) what's the relationship between the old and new > watermarks. > > > > Shall > > > > > > > > they be one-to-one mapping or the new watermarks could skip > > some > > > > > > > > timestamps? And (2) who is in charge of emitting the blocked > > > > > > > > watermarks, the operator or the process function? > > > > > > > > > > > > > > > > I'd like to hear from you. > > > > > > > > > > > > > > > > Best, > > > > > > > > Xingcan > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Jul 26, 2017 at 10:40 AM, Jark Wu <j...@apache.org> > > > wrote: > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > > > Radu's concerns make sense to me, especially the null value > > > > > > > > > timestamp and multi-proctime. > > > > > > > > > > > > > > > > > > I have also something in my mind. I would like to propose > > some > > > > time > > > > > > > > > indicator built-in functions, e.g. ROW_TIME(Timestamp ts) > > will > > > > > > > > > generate a event time logical attribute, PROC_TIME() will > > > > generate > > > > > a > > > > > > > > > processing time logical attribute. It is similar to > > > > TUMBLE_ROWTIME > > > > > > > > > proposed in this PR https://github.com/apache/ > > flink/pull/4199. > > > > > These > > > > > > > > > can be used in any queries, but there still can't be more > > than > > > > one > > > > > > > > > rowtime attribute or more than one proctime attribute in a > > > table > > > > > > > schema. > > > > > > > > > > > > > > > > > > The both selected timestamp fields from a JOIN query will > be > > > > > > > > materialized. > > > > > > > > > If someone needs further down the computation based on the > > > event > > > > > > > > > time, > > > > > > > > they > > > > > > > > > need to create a new time attribute using the ROW_TIME(...) > > > > > > > > > function. And this can also solve the null timestamp > problem > > in > > > > > LEFT > > > > > > > > > JOIN, because we > > > > > > > > can > > > > > > > > > use a user defined function to combine the two rowtimes and > > > make > > > > > the > > > > > > > > result > > > > > > > > > as the event time attribute, e.g. SELECT > > > ROW_TIME(udf(T1.rowtime, > > > > > > > > > T2.rowtime)) as rowtime FROM T1 JOIN T2 ... > > > > > > > > > > > > > > > > > > > > > > > > > > > What do you think? > > > > > > > > > > > > > > > > > > > > > > > > > > > 2017-07-25 23:48 GMT+08:00 Radu Tudoran < > > > radu.tudo...@huawei.com > > > > >: > > > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > > > > > I think this is an interesting discussion and I would > like > > to > > > > add > > > > > > > > > > some issues and give some feedback. > > > > > > > > > > > > > > > > > > > > - For supporting the join we do not only need to think of > > the > > > > > time > > > > > > > > > > but also on the null values. For example if you have a > LEFT > > > (or > > > > > > > > > > RIGHT) JOIN between items of 2 input streams, and the > > > secondary > > > > > > > > > > input is not > > > > > > > > > available > > > > > > > > > > you should still emit Row.of(event1, null)...as far as I > > know > > > > if > > > > > > > > > > you > > > > > > > > need > > > > > > > > > > to serialize/deserialize null values to send them they do > > not > > > > > > > > > > work. So > > > > > > > > we > > > > > > > > > > should include this scenario in the discussions -If we > will > > > > have > > > > > > > > > > multiple timestamp in an (output) event, one question > > > > > > > > is > > > > > > > > > > how to select afterwards which is the primary time field > on > > > > which > > > > > > > > > > to operate. When we describe a query we might be able to > > > > specify > > > > > > > > > > (or we > > > > > > > > get > > > > > > > > > > this implicitly if we implement the carryon of the 2 > > > > timestamps) > > > > > > > > Select > > > > > > > > > > T1.rowtime, T2.rowtime ...but if the output of a query is > > the > > > > > > > > > > input of > > > > > > > > a > > > > > > > > > > new processing pipeline, then, do we support generally > also > > > > that > > > > > > > > > > the > > > > > > > > > input > > > > > > > > > > has 2 time fields? ...how do we deal with the 2 input > > fields > > > > > > > > > > (maybe I > > > > > > > > am > > > > > > > > > > missing something) further in the datastream pipeline > that > > we > > > > > > > > > > build > > > > > > > > based > > > > > > > > > > on the output? > > > > > > > > > > - For the case of proctime - do we need to carry 2 > > proctimes > > > > (the > > > > > > > > > > proctimes of the incoming events from each stream), or 1 > > > > proctime > > > > > > > > > > (as > > > > > > > > we > > > > > > > > > > operate on proctime and the combination of the 2 inputs > can > > > be > > > > > > > > considered > > > > > > > > > > as a new event, the current proctime on the machine can > be > > > > > > > > > > considered > > > > > > > > the > > > > > > > > > > (proc)time reference for output event) or 3 proctimes > (the > > 2 > > > > > > > > > > proctimes > > > > > > > > of > > > > > > > > > > the input plus the proctime when the new event was > > created)? > > > > > > > > > > -Similar with the point above, for even time (which I am > > > > > > > > > > understanding > > > > > > > > as > > > > > > > > > > the time when the event was created...or do we understand > > > them > > > > as > > > > > > > > > > a > > > > > > > > time > > > > > > > > > > carry within the event?) - when we join 2 events and > output > > > an > > > > > > > > > > event > > > > > > > > that > > > > > > > > > > is the result of the join - isn't this a new event detach > > > from > > > > > the > > > > > > > > > > source\input events? ... I would tend to say it is a new > > > event > > > > > and > > > > > > > > > > then > > > > > > > > > as > > > > > > > > > > for proctime the event time of the new event is the > current > > > > time > > > > > > > > > > when > > > > > > > > > this > > > > > > > > > > output event was created. If we would accept this > > hypothesis > > > > then > > > > > > > > > > we > > > > > > > > > would > > > > > > > > > > not need the 2 time input fields to be carried/managed > > > > > implicitly. > > > > > > > > > > If someone needs further down the computation pipeline, > > then > > > in > > > > > > > > > > the query > > > > > > > > > they > > > > > > > > > > would be selected explicitly from the input stream and > > > > projected > > > > > > > > > > in > > > > > > > > some > > > > > > > > > > fields to be carried (Select T1.rowtime as FormerTime1, > > > > > T2.rowtime > > > > > > > > > > as FormerTime2, .... JOIN T1, T2...)...but they would not > > > have > > > > > the > > > > > > > > timestamp > > > > > > > > > > logic > > > > > > > > > > > > > > > > > > > > ..my 2 cents > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Dr. Radu Tudoran > > > > > > > > > > Staff Research Engineer - Big Data Expert IT R&D Division > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > > > > > > > > > > German Research Center > > > > > > > > > > Munich Office > > > > > > > > > > Riesstrasse 25, 80992 München > > > > > > > > > > > > > > > > > > > > E-mail: radu.tudo...@huawei.com > > > > > > > > > > Mobile: +49 15209084330 > > > > > > > > > > Telephone: +49 891588344173 > > > > > > > > > > > > > > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > > > > > > > > > > Hansaallee 205, 40549 Düsseldorf, Germany, > www.huawei.com > > > > > > > > > > Registered Office: Düsseldorf, Register Court Düsseldorf, > > HRB > > > > > > 56063, > > > > > > > > > > Managing Director: Bo PENG, Qiuen Peng, Shengli Wang > > > > > > > > > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht > Düsseldorf, > > > HRB > > > > > > 56063, > > > > > > > > > > Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang > > > > > > > > > > This e-mail and its attachments contain confidential > > > > information > > > > > > from > > > > > > > > > > HUAWEI, which is intended only for the person or entity > > whose > > > > > > address > > > > > > > > is > > > > > > > > > > listed above. Any use of the information contained herein > > in > > > > any > > > > > > way > > > > > > > > > > (including, but not limited to, total or partial > > disclosure, > > > > > > > > > reproduction, > > > > > > > > > > or dissemination) by persons other than the intended > > > > recipient(s) > > > > > > is > > > > > > > > > > prohibited. If you receive this e-mail in error, please > > > notify > > > > > the > > > > > > > > sender > > > > > > > > > > by phone or email immediately and delete it! > > > > > > > > > > > > > > > > > > > > -----Original Message----- > > > > > > > > > > From: Fabian Hueske [mailto:fhue...@gmail.com] > > > > > > > > > > Sent: Tuesday, July 25, 2017 4:22 PM > > > > > > > > > > To: dev@flink.apache.org > > > > > > > > > > Subject: [DISCUSS] Table API / SQL internal timestamp > > > handling > > > > > > > > > > > > > > > > > > > > Hi everybody, > > > > > > > > > > > > > > > > > > > > I'd like to propose and discuss some changes in the way > how > > > the > > > > > > Table > > > > > > > > API > > > > > > > > > > / SQL internally handles timestamps. > > > > > > > > > > > > > > > > > > > > The Table API is implemented on top of the DataStream > API. > > > The > > > > > > > > DataStream > > > > > > > > > > API hides timestamps from users in order to ensure that > > > > > timestamps > > > > > > > and > > > > > > > > > > watermarks are aligned. Instead users assign timestamps > and > > > > > > > watermarks > > > > > > > > > once > > > > > > > > > > (usually at the source or in a subsequent operator) and > let > > > the > > > > > > > system > > > > > > > > > > handle the timestamps from there on. Timestamps are > stored > > in > > > > the > > > > > > > > > timestamp > > > > > > > > > > field of the StreamRecord which is a holder for the user > > > record > > > > > and > > > > > > > the > > > > > > > > > > timestamp. DataStream operators that depend on time > > > > > (time-windows, > > > > > > > > > process > > > > > > > > > > function, ...) access the timestamp from the > StreamRecord. > > > > > > > > > > > > > > > > > > > > In contrast to the DataSteam API, the Table API and SQL > are > > > > aware > > > > > > of > > > > > > > > the > > > > > > > > > > semantics of a query. I.e., we can analyze how users > access > > > > > > > timestamps > > > > > > > > > and > > > > > > > > > > whether they are modified or not. Another difference is > > that > > > > the > > > > > > > > > timestamp > > > > > > > > > > must be part of the schema of a table in order to have > > > correct > > > > > > query > > > > > > > > > > semantics. > > > > > > > > > > > > > > > > > > > > The current design to handle timestamps is as follows. > The > > > > Table > > > > > > API > > > > > > > > > > stores timestamps in the timestamp field of the > > StreamRecord. > > > > > > > > Therefore, > > > > > > > > > > timestamps are detached from the remaining data which is > > > stored > > > > > in > > > > > > > Row > > > > > > > > > > objects. Hence, the physical representation of a row is > > > > different > > > > > > > from > > > > > > > > > its > > > > > > > > > > logical representation. We introduced a translation layer > > > > > > (RowSchema) > > > > > > > > to > > > > > > > > > > convert logical schema into physical schema. This is > > > necessery > > > > > for > > > > > > > > > > serialization or code generation when the logical plan is > > > > > > translated > > > > > > > > > into a > > > > > > > > > > physical execution plan. Processing-time timestamps are > > > > similarly > > > > > > > > > handled. > > > > > > > > > > They are not included in the physical schema and looked > up > > > when > > > > > > > needed. > > > > > > > > > > This design also requires that we need to materialize > > > > timestamps > > > > > > when > > > > > > > > > they > > > > > > > > > > are accessed by expressions. Timestamp materialization is > > > done > > > > > as a > > > > > > > > > > pre-optimization step. > > > > > > > > > > > > > > > > > > > > While thinking about the implementation of the event-time > > > > > windowed > > > > > > > > > > stream-stream join [1] I stumbled over the question which > > > > > timestamp > > > > > > > of > > > > > > > > > both > > > > > > > > > > input tables to forward. With the current design, we > could > > > only > > > > > > have > > > > > > > a > > > > > > > > > > single timestamp, so keeping both timestamps would not be > > > > > possible. > > > > > > > The > > > > > > > > > > choice of the timestamp would need to be specified by the > > > query > > > > > > > > otherwise > > > > > > > > > > it would lack clear semantics. When executing the join, > the > > > > join > > > > > > > > operator > > > > > > > > > > would need to make sure that no late data is emitted. > This > > > > would > > > > > > only > > > > > > > > > work > > > > > > > > > > the operator was able to hold back watermarks [2]. > > > > > > > > > > > > > > > > > > > > With this information in mind, I'd like to discuss the > > > > following > > > > > > > > > proposal: > > > > > > > > > > > > > > > > > > > > - We allow more than one event-time timestamp and store > > them > > > > > > directly > > > > > > > > in > > > > > > > > > > the Row > > > > > > > > > > - The query operators ensure that the watermarks are > always > > > > > behind > > > > > > > all > > > > > > > > > > event-time timestamps. With additional analysis we will > be > > > able > > > > > to > > > > > > > > > restrict > > > > > > > > > > this to timestamps that are actually used as such. > > > > > > > > > > - When a DataStream operator is time-based (e.g., a > > > DataStream > > > > > > > > > > time-windows), we inject an operator that copies the > > > timestamp > > > > > from > > > > > > > the > > > > > > > > > Row > > > > > > > > > > into the StreamRecord. > > > > > > > > > > - We try to remove the distinction between logical and > > > physical > > > > > > > schema. > > > > > > > > > > For event-time timestamps this is because we store them > in > > > the > > > > > Row > > > > > > > > > object, > > > > > > > > > > for processing-time timestamps, we add a dummy byte > field. > > > When > > > > > > > > > accessing a > > > > > > > > > > field of this type, the code generator injects the code > to > > > > fetch > > > > > > the > > > > > > > > > > timestamps. > > > > > > > > > > - We might be able to get around the pre-optimization > time > > > > > > > > > materialization > > > > > > > > > > step. > > > > > > > > > > - A join result would be able to keep both timestamps. > The > > > > > > watermark > > > > > > > > > would > > > > > > > > > > be hold back for both so both could be used in subsequent > > > > > > operations. > > > > > > > > > > > > > > > > > > > > I admit, I haven't thought this completely through. > > > > > > > > > > However, the benefits of this design from my point of > view > > > are: > > > > > > > > > > - encoding of timestamps in Rows means that the logical > > > schema > > > > is > > > > > > > equal > > > > > > > > > to > > > > > > > > > > the physical schema > > > > > > > > > > - no timestamp materialization > > > > > > > > > > - support for multiple timestamps. Otherwise we would > need > > to > > > > > > expose > > > > > > > > > > internal restrictions to the user which are hard to > > explain / > > > > > > > > > communicate. > > > > > > > > > > - no need to change any public interfaces at the moment. > > > > > > > > > > > > > > > > > > > > The drawbacks as far as I see them are: > > > > > > > > > > - additional payload due to unused timestamp field + > > possibly > > > > the > > > > > > > > > > processing-time dummy field > > > > > > > > > > - complete rework of the internal timestamp logic > > (again...) > > > > > > > > > > > > > > > > > > > > Please let me know what you think, > > > > > > > > > > Fabian > > > > > > > > > > > > > > > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-6233 > > > > > > > > > > [2] https://issues.apache.org/jira/browse/FLINK-7245 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >