Hi Fabian,

Thanks for starting the discussion. Before I give my thoughts on this can you 
please give some examples of how would you see option of using "system 
attributes"?
Do you use this when you register the stream as a table, do you use if when you 
call an SQL query, do you use it when you translate back a table to a stream / 
write it to a dynamic table?

Dr. Radu Tudoran
Senior Research Engineer - Big Data Expert
IT R&D Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!

-----Original Message-----
From: Fabian Hueske [mailto:fhue...@gmail.com] 
Sent: Tuesday, February 14, 2017 1:01 AM
To: dev@flink.apache.org
Subject: [DISCUSS] Table API / SQL indicators for event and processing time

Hi,

I'd like to start an discussion about how Table API / SQL queries indicate 
whether an operation is done in event or processing time.

1) Why do we need to indicate the time mode?

We need to distinguish event time and processing time mode for operations in 
queries in order to have the semantics of a query fully defined.
This cannot be globally done in the TableEnvironment because some queries 
explicitly request an expression such as the ORDER BY clause of an OVER window 
with PRECEDING / FOLLOWING clauses.
So we need a way to specify something like the following query:

SELECT
  a,
  SUM(b) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND 
CURRENT ROW) AS sumB, FROM myStream

where "proctime" indicates processing time. Equivalently "rowtime" would 
indicate event time.

2) Current state

The current master branch implements time support only for grouping windows in 
the Table API.
Internally, the Table API converts a 'rowtime symbol (which looks like a 
regular attribute) into a special expression which indicates event-time.
For example:

table
  .window(Tumble over 5.milli on 'rowtime as 'w)
  .groupBy('a, 'w)
  .select(...)

defines a tumbling event-time window.

Processing-time is indicated by omitting a time attribute (table.window(Tumble 
over 5.milli as 'w) ).

3) How can we do that in SQL?

In SQL we cannot add special expressions without touching the parser which we 
don't want to do because we want to stick to the SQL standard.
Therefore, I see only two options: adding system attributes or
(parameterless) built-in functions. I list some pros and cons of the approaches 
below:

1. System Attributes:
+ most natural way to access a property of a record.
+ works with joins, because time attributes can be related to tables
- We need to ensure the attributes are not writable and always present in 
streaming tables (i.e., they should be system defined attributes).
- Need to adapt existing Table API expressions (will not change the API but 
some parts of the internal translation)
- Event time value must be set when the stream is converted, processing time is 
evaluated on the fly

2. Built-in Functions
+ Users could try to modify time attributes which is not possible with
functions
- do not work with joins, because we need to address different relations
- not a natural way to access a property of a record

I think the only viable choice are system attributes, because built-in 
functions cannot be used for joins.
However, system attributes are the more complex solution because they need a 
better integration with Calcite's SQL validator (preventing user attributes 
which are named rowtime for instance).

Since there are currently a several contributions on the way (such as SQL OVER 
windows FLINK-5653 to FLINK-5658) that need time indicators, we need a solution 
soon to be able to make progress.
There are two PRs, #3252 and #3271, which implement the built-in marker 
functions proctime() and rowtime() and which could serve as a temporary 
solution (since we do not work on joins yet).
I would like to suggest to use these functions as a starting point (once the 
PRs are merged) and later change to the system attribute solution which needs a 
bit more time to be implemented.

I talked with Timo today about this issue and he said he would like to 
investigate how we can implement this as system functions properly integrated 
with Calcite and the SQL Validator.

What do others think?

Best, Fabian

Reply via email to