Hi,

Radu's concerns make sense to me, especially the null value timestamp and
multi-proctime.

I have also something in my mind. I would like to propose some time
indicator built-in functions, e.g. ROW_TIME(Timestamp ts) will generate a
event time logical attribute, PROC_TIME() will generate a processing time
logical attribute. It is similar to TUMBLE_ROWTIME proposed in this PR
https://github.com/apache/flink/pull/4199. These can be used in any
queries, but there still can't be more than one rowtime attribute or more
than one proctime attribute in a table schema.

The both selected timestamp fields from a JOIN query will be materialized.
If someone needs further down the computation based on the event time, they
need to create a new time attribute using the ROW_TIME(...) function. And
this can also solve the null timestamp problem in LEFT JOIN, because we can
use a user defined function to combine the two rowtimes and make the result
as the event time attribute, e.g. SELECT ROW_TIME(udf(T1.rowtime,
T2.rowtime)) as rowtime FROM T1 JOIN T2 ...


What do you think?


2017-07-25 23:48 GMT+08:00 Radu Tudoran <radu.tudo...@huawei.com>:

> Hi,
>
> I think this is an interesting discussion and I would like to add some
> issues and give some feedback.
>
> - For supporting the join we do not only need to think of the time but
> also on the null values. For example if you have a LEFT (or RIGHT) JOIN
> between items of 2 input streams, and the secondary input is not available
> you should still emit Row.of(event1, null)...as far as I know if you need
> to serialize/deserialize null values to send them they do not work. So we
> should include this scenario in the discussions
> -If we will have multiple timestamp in an (output) event, one question is
> how to select afterwards which is the primary time field on which to
> operate. When we describe a query we might be able to specify (or we get
> this implicitly if we implement the carryon of the 2 timestamps)  Select
> T1.rowtime, T2.rowtime ...but if the output of a query is the input of a
> new processing pipeline, then, do we support generally also that the input
> has 2 time fields? ...how do we deal with the 2 input fields (maybe I am
> missing something) further in the datastream pipeline that we build based
> on the output?
> - For the case of proctime - do we need to carry 2 proctimes (the
> proctimes of the incoming events from each stream), or 1 proctime (as we
> operate on proctime and the combination of the 2 inputs can be considered
> as a new event, the current proctime on the machine can be considered the
> (proc)time reference for output event) or 3 proctimes (the 2 proctimes of
> the input plus the proctime when the new event was created)?
> -Similar with the point above, for even time (which I am understanding as
> the time when the event was created...or do we understand them as a time
> carry within the event?) - when we join 2 events and output an event that
> is the result of the join - isn't this a new event detach from the
> source\input events? ... I would tend to say it is a new event and then as
> for proctime the event time of the new event is the current time when this
> output event was created. If we would accept this hypothesis then we would
> not need the 2 time input fields to be carried/managed implicitly.  If
> someone needs further down the computation pipeline, then in the query they
> would be selected explicitly from the input stream and projected in some
> fields to be carried (Select T1.rowtime as FormerTime1, T2.rowtime as
> FormerTime2, .... JOIN T1, T2...)...but they would not have the timestamp
> logic
>
> ..my 2 cents
>
>
>
>
> Dr. Radu Tudoran
> Staff Research Engineer - Big Data Expert
> IT R&D Division
>
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> German Research Center
> Munich Office
> Riesstrasse 25, 80992 München
>
> E-mail: radu.tudo...@huawei.com
> Mobile: +49 15209084330
> Telephone: +49 891588344173
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang
> This e-mail and its attachments contain confidential information from
> HUAWEI, which is intended only for the person or entity whose address is
> listed above. Any use of the information contained herein in any way
> (including, but not limited to, total or partial disclosure, reproduction,
> or dissemination) by persons other than the intended recipient(s) is
> prohibited. If you receive this e-mail in error, please notify the sender
> by phone or email immediately and delete it!
>
> -----Original Message-----
> From: Fabian Hueske [mailto:fhue...@gmail.com]
> Sent: Tuesday, July 25, 2017 4:22 PM
> To: dev@flink.apache.org
> Subject: [DISCUSS] Table API / SQL internal timestamp handling
>
> Hi everybody,
>
> I'd like to propose and discuss some changes in the way how the Table API
> / SQL internally handles timestamps.
>
> The Table API is implemented on top of the DataStream API. The DataStream
> API hides timestamps from users in order to ensure that timestamps and
> watermarks are aligned. Instead users assign timestamps and watermarks once
> (usually at the source or in a subsequent operator) and let the system
> handle the timestamps from there on. Timestamps are stored in the timestamp
> field of the StreamRecord which is a holder for the user record and the
> timestamp. DataStream operators that depend on time (time-windows, process
> function, ...) access the timestamp from the StreamRecord.
>
> In contrast to the DataSteam API, the Table API and SQL are aware of the
> semantics of a query. I.e., we can analyze how users access timestamps and
> whether they are modified or not. Another difference is that the timestamp
> must be part of the schema of a table in order to have correct query
> semantics.
>
> The current design to handle timestamps is as follows. The Table API
> stores timestamps in the timestamp field of the StreamRecord. Therefore,
> timestamps are detached from the remaining data which is stored in Row
> objects. Hence, the physical representation of a row is different from its
> logical representation. We introduced a translation layer (RowSchema) to
> convert logical schema into physical schema. This is necessery for
> serialization or code generation when the logical plan is translated into a
> physical execution plan. Processing-time timestamps are similarly handled.
> They are not included in the physical schema and looked up when needed.
> This design also requires that we need to materialize timestamps when they
> are accessed by expressions. Timestamp materialization is done as a
> pre-optimization step.
>
> While thinking about the implementation of the event-time windowed
> stream-stream join [1] I stumbled over the question which timestamp of both
> input tables to forward. With the current design, we could only have a
> single timestamp, so keeping both timestamps would not be possible. The
> choice of the timestamp would need to be specified by the query otherwise
> it would lack clear semantics. When executing the join, the join operator
> would need to make sure that no late data is emitted. This would only work
> the operator was able to hold back watermarks [2].
>
> With this information in mind, I'd like to discuss the following proposal:
>
> - We allow more than one event-time timestamp and store them directly in
> the Row
> - The query operators ensure that the watermarks are always behind all
> event-time timestamps. With additional analysis we will be able to restrict
> this to timestamps that are actually used as such.
> - When a DataStream operator is time-based (e.g., a DataStream
> time-windows), we inject an operator that copies the timestamp from the Row
> into the StreamRecord.
> - We try to remove the distinction between logical and physical schema.
> For event-time timestamps this is because we store them in the Row object,
> for processing-time timestamps, we add a dummy byte field. When accessing a
> field of this type, the code generator injects the code to fetch the
> timestamps.
> - We might be able to get around the pre-optimization time materialization
> step.
> - A join result would be able to keep both timestamps. The watermark would
> be hold back for both so both could be used in subsequent operations.
>
> I admit, I haven't thought this completely through.
> However, the benefits of this design from my point of view are:
> - encoding of timestamps in Rows means that the logical schema is equal to
> the physical schema
> - no timestamp materialization
> - support for multiple timestamps. Otherwise we would need to expose
> internal restrictions to the user which are hard to explain / communicate.
> - no need to change any public interfaces at the moment.
>
> The drawbacks as far as I see them are:
> - additional payload due to unused timestamp field + possibly the
> processing-time dummy field
> - complete rework of the internal timestamp logic (again...)
>
> Please let me know what you think,
> Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-6233
> [2] https://issues.apache.org/jira/browse/FLINK-7245
>

Reply via email to