Hi all, @Shaoxuan - thanks for the remarks. I have a question regarding your suggestion not to consider to create proctime window in a regular column. I think this would be useful though. First you might need to carry the timestamp indicator of when the processing happened (for log purposes, provenance, traceability ...). Secondly - I do not think it is contradicting with the semantics in batch SQL as in SQL you have the function "now()" ...which pretty much carry the same semantics as having a function to mark the proctime and then projecting this into a column. If I am not mistaken you can introduce in database columns the result of calling now().
Dr. Radu Tudoran Staff Research Engineer - Big Data Expert IT R&D Division HUAWEI TECHNOLOGIES Duesseldorf GmbH German Research Center Munich Office Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Bo PENG, Qiuen Peng, Shengli Wang Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it! -----Original Message----- From: Shaoxuan Wang [mailto:shaox...@apache.org] Sent: Thursday, July 27, 2017 6:00 AM To: Dev Subject: Re: [DISCUSS] Table API / SQL internal timestamp handling Hi Everyone, I like this proposal. The problem we used to have is that we have treated eventtime column as a special timestamp column. An eventtime column is nothing special than all other regular columns, but with a certain flag (eventtime-indicator) inferring that this column can be used as an eventime to decide when a bounded query can emit the final result by comparing with a concern associated waterMark. I have a few comments adding on top of this (they may have already been addressed in the conversation — since It’s a long discussion, I may miss something): 1. While we remove timestamp column, we introduce eventtime-indicator (we may already have this concept), it is only a flag can be applied for any column (note that some types may not be able to be used as eventtime column), indicating if this column can be used as eventtime or not. This flag is useful for validation and codeGen. 2. A column that has been used as an eventtime, should not lose its own type. We should not cast all eventime column to the timestamp type. For instance, if a column is a long type, it will keep as long type even if a window aggregate has used it as a eventtime. 3. Eventtime will only work well with some associated waterMark strategy. We may consider forcing user to provide a waterMark logic on his/her selected eventtime. 4. For proctime, I hope we should not introduce proctime-indicator for regular column. Ideally we should not allow user to create proctime window on regular column, as this is against the batch query semantics. Therefore I suggest we should always introduce a proctime timestamp column for users to create proctime window. And unlike eventtime, proctime does not need any associated waterMark strategy, as there is no such out of order issue for the proctime. Regards, Shaoxuan On Wed, Jul 26, 2017 at 9:10 PM, Fabian Hueske <fhue...@gmail.com> wrote: > Thanks everybody for the replies so far. > > Let me answer your questions and reply to your thoughts: > > Radu: > --- > First of all, although my proposal is movivated by a join operator, > this discussion is about timestamp handling, not about joins in general. > > - The semantics of outer joins is to emit null and there is no way > around that. This is not an issue for us. Actually, outer joins are > supported by the batch SQL / Table API. It is true that outer joins > might result in null timestamps. Calcite will mark those fields as > nullable and we should check that timestamps which are used in windows or > joins are not nullable. > - The query has to explicitly specify which timestamp attribute to use. > Otherwise its semantics are not complete and it is invalid. A > group-window that follows a join will reference a timestamp attribute > and this will be used. The other timestamp might be projected out. > When a result with two timestamps is converted into a DataStream, the > user has to decide. This could be done inside of the Table to > DataStream conversion. If the Table has more than one valid timestamp, > the conversion will ask which timestamp to forward. > - A proctime join should forward all proctime attributes of the input > tables. All will be the same, but that does not matter because they > are either virtual or represented as 1 byte dummy attributes. Also, > unused ones will be automatically projected out anyway. > - An event-time join should forward all event-time attributes of the > input tables. Creating a new event-time attribute using processing > time makes event-time processing pointless and will give completely random > results. > Event-time is not about the "time an event is created" but about a > timestamp that is associated with an event. For example an order event > could have three timestamps: "orderTime", "shipTime", and "receiveTime". > Each could be a valid event-time attribute. > > Jark: > --- > Thanks for the proposal. I think I understand what you want to achieve > with this, but I think functions to instantiate time attributes are > not necessary and would make things more complicated. The point of > supporting multiple time attributes is to ensure that all of them are > aligned with the watermarks. If we add a method ROW_TIME(timestamp) > and we don't know if the timestamp is aligned with the watermarks. If > that is not the case, the query won't be executed as expected. The > issue of LEFT JOIN can easily be addressed by checking for > nullablility during optimization when an operator tries to use it. > > The beauty of supporting multiple timestamps is that a user does not > have to care at all about timestamps (or timestamp functions) and > watermarks. As long as the query uses a timestamp attribute that was > originally declared as rowtime in a source table (and was not modified > afterwards), this is fine. Think of a cascade of three windowed joins: > R - S - T - U, and you want to join S - T first. In that case, you > need to preserve the timestamps of S and T in order to join R and U. > From a relational algebra point of view, there is no reason to have a > limitation on how these attributes are accessed. Timestamps are just > regular fields of a record. The only restriction in the context of > stream processing is that the watermark must be aligned with > timestamps, i.e., follow all timestamps such that data is not late > according to any of the timestamps. This we can achieve and handle internally > without the user having to worry about it. > > Xingcan: > --- > I think your questions are mostly implementation details and not so > much related to the original proposal of supporting multiple timestamps. > > My take on your questions is: > 1. The rate at which watermarks are emitted is not important for the > correctness of a query. However, it can affect the performance, > because each watermark is sent as a special record and it is > broadcasted. My initial take would be to emit a new watermark whenever > the operator updated its watermark because usually, the operator would > have forwarded the old watermark. > 2. I would say this is the responsibility of the operator because > first it is not related to the semantics of the query and second it is > an operator responsibility in the existing code as well. > > Jark 2: > You are right, the query (or user) must decide on the event-time > attribute to use. My main point is, it is much easier for the user > (and for us > internally) if we internally track multiple timestamps. Because we do > not have to prune the timestamp that will not be later used into the join. > Moreover, both timestamps might be used later (see join example, which > could be reordered of course). All we have to do is to ensure that all > timestamps are aligned with the watermarks. > > Radu 2: > IMO, time (or anything else that affects the semantics) should never > be decided by the system. When we would do that, a query is not fully > specified or, even worse, the way it is executed is semantically > incorrect and produces arbitrary results. > > Time attributes should be specified in the source tables and then > forwarded from there. So far I haven't seen an example where this > would not be possible (within the semantics or relational queries). If > we do that right, there won't be a need for explicit time management > except for the definition of the initial timestamps which can be > hidden in the table definition. As I said before, we (or the system) > cannot decide on the timestamp because that would lead to arbitrary > results. Asking the user to do that would mean explicit time > management which is also not desirable. I think my proposal gives > users all options (timestamps) to chose from and the system can do the rest. > > Best, Fabian > > 2017-07-26 10:46 GMT+02:00 Radu Tudoran <radu.tudo...@huawei.com>: > > > Hi everyone, > > > > I just want to add that I was referring to NULL values not > > specifically > to > > timefields but to the event itself. If you have the follow situation > > > > Stream 1: .... | event1 | .... > > Stream 2: .... | | .... > > > > And you have a LEFT JOIN between stream 1 and stream 2 (no > > condition)...then you still need to emit (event1,null) ... as this > > is the behavior of left join. This is maybe a very simple situation, > > but the > point > > is that left joins and right joins can have situation when you have > > elements only in the main stream and no element in the right stream. > > And for this case you still need to emit. > > > > > > Regarding whether time should be decided by system or not...i think > > the answer is it depends. I think the example from Jack is very good > > and > shows > > the need for some mechanisms to select/manage the time (I like the > proposal > > of having functions to insert the time in the output!). However, if > > a business analyst would write a query without explicit time > > management we still need to have some default behavior in the > > system. As per my initial proposal, I think we need to decide on > > one timestamp field to carry (either a new one at the moment of the > > join) or the timestamp from the > main > > stream (...although I am not sure which one is the main stream in > > the > case > > of a full join:) ) > > > > > > Dr. Radu Tudoran > > Staff Research Engineer - Big Data Expert IT R&D Division > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > > German Research Center > > Munich Office > > Riesstrasse 25, 80992 München > > > > E-mail: radu.tudo...@huawei.com > > Mobile: +49 15209084330 > > Telephone: +49 891588344173 > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com > > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, > > Managing Director: Bo PENG, Qiuen Peng, Shengli Wang > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, > > Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang > > This e-mail and its attachments contain confidential information from > > HUAWEI, which is intended only for the person or entity whose address is > > listed above. Any use of the information contained herein in any way > > (including, but not limited to, total or partial disclosure, > reproduction, > > or dissemination) by persons other than the intended recipient(s) is > > prohibited. If you receive this e-mail in error, please notify the sender > > by phone or email immediately and delete it! > > > > -----Original Message----- > > From: Jark Wu [mailto:j...@apache.org] > > Sent: Wednesday, July 26, 2017 8:29 AM > > To: dev@flink.apache.org > > Subject: Re: [DISCUSS] Table API / SQL internal timestamp handling > > > > Hi Xingcan, > > > > IMO, I don't think event-time of join results could be automatically > > decided by system. Considering batch tables, if users want a event time > > window aggregation after join, user must specify the time field > explicitly > > (T1.rowtime or T2.rowtime or the computed result of them). So in the case > > of streaming tables, the system also can't automatically decide the time > > field for users. > > > > In regards to the question you asked, I think we don't need to change the > > watermark no matter we choose the left rowtime or right rowtime or the > > combination. Because the watermark has been aligned with the rowtime in > the > > source. Maybe I'm wrong about this, please correct me if I'm missing > > something. > > > > What do you think? > > > > Regards, > > Jark > > > > 2017-07-26 11:24 GMT+08:00 Xingcan Cui <xingc...@gmail.com>: > > > > > Hi all, > > > > > > @Fabian, thanks for raising this. > > > > > > @Radu and Jark, personally I think the timestamp field is critical for > > > query processing and thus should be declared as (or supposed to be) > > > NOT NULL. In addition, I think the event-time semantic of the join > > > results should be automatically decided by the system, i.e., we do not > > > hand it over to users so to avoid some unpredictable assignment. > > > > > > Generally speaking, consolidating different time fields is possible > > > since all of them should ideally be monotonically increasing. From my > > > point of view, the problem lies in > > > (1) what's the relationship between the old and new watermarks. Shall > > > they be one-to-one mapping or the new watermarks could skip some > > > timestamps? And (2) who is in charge of emitting the blocked > > > watermarks, the operator or the process function? > > > > > > I'd like to hear from you. > > > > > > Best, > > > Xingcan > > > > > > > > > > > > On Wed, Jul 26, 2017 at 10:40 AM, Jark Wu <j...@apache.org> wrote: > > > > > > > Hi, > > > > > > > > Radu's concerns make sense to me, especially the null value > > > > timestamp and multi-proctime. > > > > > > > > I have also something in my mind. I would like to propose some time > > > > indicator built-in functions, e.g. ROW_TIME(Timestamp ts) will > > > > generate a event time logical attribute, PROC_TIME() will generate a > > > > processing time logical attribute. It is similar to TUMBLE_ROWTIME > > > > proposed in this PR https://github.com/apache/flink/pull/4199. These > > > > can be used in any queries, but there still can't be more than one > > > > rowtime attribute or more than one proctime attribute in a table > > schema. > > > > > > > > The both selected timestamp fields from a JOIN query will be > > > materialized. > > > > If someone needs further down the computation based on the event > > > > time, > > > they > > > > need to create a new time attribute using the ROW_TIME(...) > > > > function. And this can also solve the null timestamp problem in LEFT > > > > JOIN, because we > > > can > > > > use a user defined function to combine the two rowtimes and make the > > > result > > > > as the event time attribute, e.g. SELECT ROW_TIME(udf(T1.rowtime, > > > > T2.rowtime)) as rowtime FROM T1 JOIN T2 ... > > > > > > > > > > > > What do you think? > > > > > > > > > > > > 2017-07-25 23:48 GMT+08:00 Radu Tudoran <radu.tudo...@huawei.com>: > > > > > > > > > Hi, > > > > > > > > > > I think this is an interesting discussion and I would like to add > > > > > some issues and give some feedback. > > > > > > > > > > - For supporting the join we do not only need to think of the time > > > > > but also on the null values. For example if you have a LEFT (or > > > > > RIGHT) JOIN between items of 2 input streams, and the secondary > > > > > input is not > > > > available > > > > > you should still emit Row.of(event1, null)...as far as I know if > > > > > you > > > need > > > > > to serialize/deserialize null values to send them they do not > > > > > work. So > > > we > > > > > should include this scenario in the discussions -If we will have > > > > > multiple timestamp in an (output) event, one question > > > is > > > > > how to select afterwards which is the primary time field on which > > > > > to operate. When we describe a query we might be able to specify > > > > > (or we > > > get > > > > > this implicitly if we implement the carryon of the 2 timestamps) > > > Select > > > > > T1.rowtime, T2.rowtime ...but if the output of a query is the > > > > > input of > > > a > > > > > new processing pipeline, then, do we support generally also that > > > > > the > > > > input > > > > > has 2 time fields? ...how do we deal with the 2 input fields > > > > > (maybe I > > > am > > > > > missing something) further in the datastream pipeline that we > > > > > build > > > based > > > > > on the output? > > > > > - For the case of proctime - do we need to carry 2 proctimes (the > > > > > proctimes of the incoming events from each stream), or 1 proctime > > > > > (as > > > we > > > > > operate on proctime and the combination of the 2 inputs can be > > > considered > > > > > as a new event, the current proctime on the machine can be > > > > > considered > > > the > > > > > (proc)time reference for output event) or 3 proctimes (the 2 > > > > > proctimes > > > of > > > > > the input plus the proctime when the new event was created)? > > > > > -Similar with the point above, for even time (which I am > > > > > understanding > > > as > > > > > the time when the event was created...or do we understand them as > > > > > a > > > time > > > > > carry within the event?) - when we join 2 events and output an > > > > > event > > > that > > > > > is the result of the join - isn't this a new event detach from the > > > > > source\input events? ... I would tend to say it is a new event and > > > > > then > > > > as > > > > > for proctime the event time of the new event is the current time > > > > > when > > > > this > > > > > output event was created. If we would accept this hypothesis then > > > > > we > > > > would > > > > > not need the 2 time input fields to be carried/managed implicitly. > > > > > If someone needs further down the computation pipeline, then in > > > > > the query > > > > they > > > > > would be selected explicitly from the input stream and projected > > > > > in > > > some > > > > > fields to be carried (Select T1.rowtime as FormerTime1, T2.rowtime > > > > > as FormerTime2, .... JOIN T1, T2...)...but they would not have the > > > timestamp > > > > > logic > > > > > > > > > > ..my 2 cents > > > > > > > > > > > > > > > > > > > > > > > > > Dr. Radu Tudoran > > > > > Staff Research Engineer - Big Data Expert IT R&D Division > > > > > > > > > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > > > > > German Research Center > > > > > Munich Office > > > > > Riesstrasse 25, 80992 München > > > > > > > > > > E-mail: radu.tudo...@huawei.com > > > > > Mobile: +49 15209084330 > > > > > Telephone: +49 891588344173 > > > > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > > > > > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com > > > > > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB > 56063, > > > > > Managing Director: Bo PENG, Qiuen Peng, Shengli Wang > > > > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB > 56063, > > > > > Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang > > > > > This e-mail and its attachments contain confidential information > from > > > > > HUAWEI, which is intended only for the person or entity whose > address > > > is > > > > > listed above. Any use of the information contained herein in any > way > > > > > (including, but not limited to, total or partial disclosure, > > > > reproduction, > > > > > or dissemination) by persons other than the intended recipient(s) > is > > > > > prohibited. If you receive this e-mail in error, please notify the > > > sender > > > > > by phone or email immediately and delete it! > > > > > > > > > > -----Original Message----- > > > > > From: Fabian Hueske [mailto:fhue...@gmail.com] > > > > > Sent: Tuesday, July 25, 2017 4:22 PM > > > > > To: dev@flink.apache.org > > > > > Subject: [DISCUSS] Table API / SQL internal timestamp handling > > > > > > > > > > Hi everybody, > > > > > > > > > > I'd like to propose and discuss some changes in the way how the > Table > > > API > > > > > / SQL internally handles timestamps. > > > > > > > > > > The Table API is implemented on top of the DataStream API. The > > > DataStream > > > > > API hides timestamps from users in order to ensure that timestamps > > and > > > > > watermarks are aligned. Instead users assign timestamps and > > watermarks > > > > once > > > > > (usually at the source or in a subsequent operator) and let the > > system > > > > > handle the timestamps from there on. Timestamps are stored in the > > > > timestamp > > > > > field of the StreamRecord which is a holder for the user record and > > the > > > > > timestamp. DataStream operators that depend on time (time-windows, > > > > process > > > > > function, ...) access the timestamp from the StreamRecord. > > > > > > > > > > In contrast to the DataSteam API, the Table API and SQL are aware > of > > > the > > > > > semantics of a query. I.e., we can analyze how users access > > timestamps > > > > and > > > > > whether they are modified or not. Another difference is that the > > > > timestamp > > > > > must be part of the schema of a table in order to have correct > query > > > > > semantics. > > > > > > > > > > The current design to handle timestamps is as follows. The Table > API > > > > > stores timestamps in the timestamp field of the StreamRecord. > > > Therefore, > > > > > timestamps are detached from the remaining data which is stored in > > Row > > > > > objects. Hence, the physical representation of a row is different > > from > > > > its > > > > > logical representation. We introduced a translation layer > (RowSchema) > > > to > > > > > convert logical schema into physical schema. This is necessery for > > > > > serialization or code generation when the logical plan is > translated > > > > into a > > > > > physical execution plan. Processing-time timestamps are similarly > > > > handled. > > > > > They are not included in the physical schema and looked up when > > needed. > > > > > This design also requires that we need to materialize timestamps > when > > > > they > > > > > are accessed by expressions. Timestamp materialization is done as a > > > > > pre-optimization step. > > > > > > > > > > While thinking about the implementation of the event-time windowed > > > > > stream-stream join [1] I stumbled over the question which timestamp > > of > > > > both > > > > > input tables to forward. With the current design, we could only > have > > a > > > > > single timestamp, so keeping both timestamps would not be possible. > > The > > > > > choice of the timestamp would need to be specified by the query > > > otherwise > > > > > it would lack clear semantics. When executing the join, the join > > > operator > > > > > would need to make sure that no late data is emitted. This would > only > > > > work > > > > > the operator was able to hold back watermarks [2]. > > > > > > > > > > With this information in mind, I'd like to discuss the following > > > > proposal: > > > > > > > > > > - We allow more than one event-time timestamp and store them > directly > > > in > > > > > the Row > > > > > - The query operators ensure that the watermarks are always behind > > all > > > > > event-time timestamps. With additional analysis we will be able to > > > > restrict > > > > > this to timestamps that are actually used as such. > > > > > - When a DataStream operator is time-based (e.g., a DataStream > > > > > time-windows), we inject an operator that copies the timestamp from > > the > > > > Row > > > > > into the StreamRecord. > > > > > - We try to remove the distinction between logical and physical > > schema. > > > > > For event-time timestamps this is because we store them in the Row > > > > object, > > > > > for processing-time timestamps, we add a dummy byte field. When > > > > accessing a > > > > > field of this type, the code generator injects the code to fetch > the > > > > > timestamps. > > > > > - We might be able to get around the pre-optimization time > > > > materialization > > > > > step. > > > > > - A join result would be able to keep both timestamps. The > watermark > > > > would > > > > > be hold back for both so both could be used in subsequent > operations. > > > > > > > > > > I admit, I haven't thought this completely through. > > > > > However, the benefits of this design from my point of view are: > > > > > - encoding of timestamps in Rows means that the logical schema is > > equal > > > > to > > > > > the physical schema > > > > > - no timestamp materialization > > > > > - support for multiple timestamps. Otherwise we would need to > expose > > > > > internal restrictions to the user which are hard to explain / > > > > communicate. > > > > > - no need to change any public interfaces at the moment. > > > > > > > > > > The drawbacks as far as I see them are: > > > > > - additional payload due to unused timestamp field + possibly the > > > > > processing-time dummy field > > > > > - complete rework of the internal timestamp logic (again...) > > > > > > > > > > Please let me know what you think, > > > > > Fabian > > > > > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-6233 > > > > > [2] https://issues.apache.org/jira/browse/FLINK-7245 > > > > > > > > > > > > > > >