Hi, Radu's concerns make sense to me, especially the null value timestamp and multi-proctime.
I have also something in my mind. I would like to propose some time indicator built-in functions, e.g. ROW_TIME(Timestamp ts) will generate a event time logical attribute, PROC_TIME() will generate a processing time logical attribute. It is similar to TUMBLE_ROWTIME proposed in this PR https://github.com/apache/flink/pull/4199. These can be used in any queries, but there still can't be more than one rowtime attribute or more than one proctime attribute in a table schema. The both selected timestamp fields from a JOIN query will be materialized. If someone needs further down the computation based on the event time, they need to create a new time attribute using the ROW_TIME(...) function. And this can also solve the null timestamp problem in LEFT JOIN, because we can use a user defined function to combine the two rowtimes and make the result as the event time attribute, e.g. SELECT ROW_TIME(udf(T1.rowtime, T2.rowtime)) as rowtime FROM T1 JOIN T2 ... What do you think? 2017-07-25 23:48 GMT+08:00 Radu Tudoran <radu.tudo...@huawei.com>: > Hi, > > I think this is an interesting discussion and I would like to add some > issues and give some feedback. > > - For supporting the join we do not only need to think of the time but > also on the null values. For example if you have a LEFT (or RIGHT) JOIN > between items of 2 input streams, and the secondary input is not available > you should still emit Row.of(event1, null)...as far as I know if you need > to serialize/deserialize null values to send them they do not work. So we > should include this scenario in the discussions > -If we will have multiple timestamp in an (output) event, one question is > how to select afterwards which is the primary time field on which to > operate. When we describe a query we might be able to specify (or we get > this implicitly if we implement the carryon of the 2 timestamps) Select > T1.rowtime, T2.rowtime ...but if the output of a query is the input of a > new processing pipeline, then, do we support generally also that the input > has 2 time fields? ...how do we deal with the 2 input fields (maybe I am > missing something) further in the datastream pipeline that we build based > on the output? > - For the case of proctime - do we need to carry 2 proctimes (the > proctimes of the incoming events from each stream), or 1 proctime (as we > operate on proctime and the combination of the 2 inputs can be considered > as a new event, the current proctime on the machine can be considered the > (proc)time reference for output event) or 3 proctimes (the 2 proctimes of > the input plus the proctime when the new event was created)? > -Similar with the point above, for even time (which I am understanding as > the time when the event was created...or do we understand them as a time > carry within the event?) - when we join 2 events and output an event that > is the result of the join - isn't this a new event detach from the > source\input events? ... I would tend to say it is a new event and then as > for proctime the event time of the new event is the current time when this > output event was created. If we would accept this hypothesis then we would > not need the 2 time input fields to be carried/managed implicitly. If > someone needs further down the computation pipeline, then in the query they > would be selected explicitly from the input stream and projected in some > fields to be carried (Select T1.rowtime as FormerTime1, T2.rowtime as > FormerTime2, .... JOIN T1, T2...)...but they would not have the timestamp > logic > > ..my 2 cents > > > > > Dr. Radu Tudoran > Staff Research Engineer - Big Data Expert > IT R&D Division > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > German Research Center > Munich Office > Riesstrasse 25, 80992 München > > E-mail: radu.tudo...@huawei.com > Mobile: +49 15209084330 > Telephone: +49 891588344173 > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, > Managing Director: Bo PENG, Qiuen Peng, Shengli Wang > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, > Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang > This e-mail and its attachments contain confidential information from > HUAWEI, which is intended only for the person or entity whose address is > listed above. Any use of the information contained herein in any way > (including, but not limited to, total or partial disclosure, reproduction, > or dissemination) by persons other than the intended recipient(s) is > prohibited. If you receive this e-mail in error, please notify the sender > by phone or email immediately and delete it! > > -----Original Message----- > From: Fabian Hueske [mailto:fhue...@gmail.com] > Sent: Tuesday, July 25, 2017 4:22 PM > To: dev@flink.apache.org > Subject: [DISCUSS] Table API / SQL internal timestamp handling > > Hi everybody, > > I'd like to propose and discuss some changes in the way how the Table API > / SQL internally handles timestamps. > > The Table API is implemented on top of the DataStream API. The DataStream > API hides timestamps from users in order to ensure that timestamps and > watermarks are aligned. Instead users assign timestamps and watermarks once > (usually at the source or in a subsequent operator) and let the system > handle the timestamps from there on. Timestamps are stored in the timestamp > field of the StreamRecord which is a holder for the user record and the > timestamp. DataStream operators that depend on time (time-windows, process > function, ...) access the timestamp from the StreamRecord. > > In contrast to the DataSteam API, the Table API and SQL are aware of the > semantics of a query. I.e., we can analyze how users access timestamps and > whether they are modified or not. Another difference is that the timestamp > must be part of the schema of a table in order to have correct query > semantics. > > The current design to handle timestamps is as follows. The Table API > stores timestamps in the timestamp field of the StreamRecord. Therefore, > timestamps are detached from the remaining data which is stored in Row > objects. Hence, the physical representation of a row is different from its > logical representation. We introduced a translation layer (RowSchema) to > convert logical schema into physical schema. This is necessery for > serialization or code generation when the logical plan is translated into a > physical execution plan. Processing-time timestamps are similarly handled. > They are not included in the physical schema and looked up when needed. > This design also requires that we need to materialize timestamps when they > are accessed by expressions. Timestamp materialization is done as a > pre-optimization step. > > While thinking about the implementation of the event-time windowed > stream-stream join [1] I stumbled over the question which timestamp of both > input tables to forward. With the current design, we could only have a > single timestamp, so keeping both timestamps would not be possible. The > choice of the timestamp would need to be specified by the query otherwise > it would lack clear semantics. When executing the join, the join operator > would need to make sure that no late data is emitted. This would only work > the operator was able to hold back watermarks [2]. > > With this information in mind, I'd like to discuss the following proposal: > > - We allow more than one event-time timestamp and store them directly in > the Row > - The query operators ensure that the watermarks are always behind all > event-time timestamps. With additional analysis we will be able to restrict > this to timestamps that are actually used as such. > - When a DataStream operator is time-based (e.g., a DataStream > time-windows), we inject an operator that copies the timestamp from the Row > into the StreamRecord. > - We try to remove the distinction between logical and physical schema. > For event-time timestamps this is because we store them in the Row object, > for processing-time timestamps, we add a dummy byte field. When accessing a > field of this type, the code generator injects the code to fetch the > timestamps. > - We might be able to get around the pre-optimization time materialization > step. > - A join result would be able to keep both timestamps. The watermark would > be hold back for both so both could be used in subsequent operations. > > I admit, I haven't thought this completely through. > However, the benefits of this design from my point of view are: > - encoding of timestamps in Rows means that the logical schema is equal to > the physical schema > - no timestamp materialization > - support for multiple timestamps. Otherwise we would need to expose > internal restrictions to the user which are hard to explain / communicate. > - no need to change any public interfaces at the moment. > > The drawbacks as far as I see them are: > - additional payload due to unused timestamp field + possibly the > processing-time dummy field > - complete rework of the internal timestamp logic (again...) > > Please let me know what you think, > Fabian > > [1] https://issues.apache.org/jira/browse/FLINK-6233 > [2] https://issues.apache.org/jira/browse/FLINK-7245 >