Hi all,

at first I also thought that built-in functions (rowtime() and proctime()) are the easiest solution. However, I think to be future-proof we should make them system attributes; esp. to relate them to a corresponding table in case of multiple tables. Logically they are attributes of each row, which is already done in Table API.

I will ask on the Calcite ML if there is a good way for integrating system attributes. Right now, I would propose the following implementation:

- we introduce a custom row type (extending RelDataType)
- in a streaming environment every row has two attributes by default (rowtime and proctime) - we do not allow creating a row type with those attributes (this should already prevent `SELECT field AS rowtime FROM ...`) - we need to ensure that these attributes are not part of expansion like `SELECT * FROM ...` - implement some rule/logic that translates the attributes to special RexNodes internally, such that the opimizer does not modify these attributes

What do you think?

Regards,
Timo




Am 15/02/17 um 03:36 schrieb Xingcan Cui:
Hi all,

thanks for this thread.

@Fabian If I didn't miss the point, the main difference between the two
approaches is whether or not taking these time attributes as common table
fields that are directly available to users. Whatever, these time
attributes should be attached to records (right?), and the discussion lies
in whether give them public qualifiers like other common fields or private
qualifiers and related get/set methods.

The former (system attributes) approach will be more compatible with
existing SQL read-only operations (e.g., select, join), but we need to add
restrictions on SQL modification operation (like what?). I think there are
no needs to forbid users modifying these attributes via table APIs (like
map function). Just inform them about these special attribute names like
system built in aggregator names in iteration.

As for the built in function approach, I don't know if, for now, there are
functions applied on a single row (maybe the value access functions like
COMPOSITE.get(STRING)?). It seems that most of the built in functions work
for a single field or on columns and thus it will be mountains of work if
we want to add a new kind of function to SQL. Maybe all existing operations
should be modified to support it.

All in all, if there are existing supports for single row function, I
prefer the built in function approach. Otherwise the system attributes
approach should be better. After all there are not so much modification
operations in SQL and maybe we can use alias to support time attributes
setting (just hypothesis, not sure if it's feasible).

@Haohui I think the given query is valid if we add a aggregate
function to (PROCTIME()
- ROWTIME()) / 1000 and it should be executed efficiently.

Best,
Xingcan

On Wed, Feb 15, 2017 at 6:17 AM, Haohui Mai <ricet...@gmail.com> wrote:

Hi,

Thanks for starting the discussion. I can see there are multiple trade-offs
in these two approaches. One question I have is that to which extent Flink
wants to open its APIs to allow users to access both processing and event
time.

Before we talk about joins, my understanding for the two approaches that
you mentioned are essentially (1) treating the value of event / processing
time as first-class fields for each row, (2) limiting the scope of time
indicators to only specifying windows. Take the following query as an
example:

SELECT (PROCTIME() - ROWTIME()) / 1000 AS latency FROM table GROUP BY
FLOOR(PROCTIME() TO MINUTES)

There are several questions we can ask:

(1) Is it a valid query?
(2) How efficient the query will be?

For this query I can see arguments from both sides. I think at the end of
the day it really comes down to what Flink wants to support. After working
on FLINK-5624 I'm more inclined to support the second approach (i.e.,
built-in functions). The main reason why is that the APIs of Flink are
designed to separate times from the real payloads. It probably makes sense
for the Table / SQL APIs to have the same designs.

For joins I don't have a clear answer on top of my head. Flink requires two
streams to be put in the same window before doing the joins. This is
essentially a subset of what SQL can express. I don't know what would be
the best approach here.

Regards,
Haohui


On Tue, Feb 14, 2017 at 12:26 AM Fabian Hueske <fhue...@gmail.com> wrote:

Hi,

It would as in the query I gave as an example before:

SELECT
   a,
   SUM(b) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2
PRECEDING AND CURRENT ROW) AS sumB,
FROM myStream

Here "proctime" would be a system attribute of the table "myStream".
The table would also have another system attribute called "rowtime" which
would be used to indicate event time semantics.
These attributes would always be present in tables which are derived from
streams.
Because we still require that streams have timestamps and watermarks
assigned (either by the StreamTableSource or the somewhere downstream the
DataStream program) when they are converted into a table, there is no
need
to register anything.

Does that answer your questions?

Best, Fabian



2017-02-14 2:04 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>:

Hi Fabian,

Thanks for starting the discussion. Before I give my thoughts on this
can
you please give some examples of how would you see option of using
"system
attributes"?
Do you use this when you register the stream as a table, do you use if
when you call an SQL query, do you use it when you translate back a
table
to a stream / write it to a dynamic table?

Dr. Radu Tudoran
Senior Research Engineer - Big Data Expert
IT R&D Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330 <+49%201520%209084330>
Telephone: +49 891588344173 <+49%2089%201588344173>

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from
HUAWEI, which is intended only for the person or entity whose address
is
listed above. Any use of the information contained herein in any way
(including, but not limited to, total or partial disclosure,
reproduction,
or dissemination) by persons other than the intended recipient(s) is
prohibited. If you receive this e-mail in error, please notify the
sender
by phone or email immediately and delete it!

-----Original Message-----
From: Fabian Hueske [mailto:fhue...@gmail.com]
Sent: Tuesday, February 14, 2017 1:01 AM
To: dev@flink.apache.org
Subject: [DISCUSS] Table API / SQL indicators for event and processing
time
Hi,

I'd like to start an discussion about how Table API / SQL queries
indicate
whether an operation is done in event or processing time.

1) Why do we need to indicate the time mode?

We need to distinguish event time and processing time mode for
operations
in queries in order to have the semantics of a query fully defined.
This cannot be globally done in the TableEnvironment because some
queries
explicitly request an expression such as the ORDER BY clause of an OVER
window with PRECEDING / FOLLOWING clauses.
So we need a way to specify something like the following query:

SELECT
   a,
   SUM(b) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2
PRECEDING
AND CURRENT ROW) AS sumB, FROM myStream

where "proctime" indicates processing time. Equivalently "rowtime"
would
indicate event time.

2) Current state

The current master branch implements time support only for grouping
windows in the Table API.
Internally, the Table API converts a 'rowtime symbol (which looks like
a
regular attribute) into a special expression which indicates
event-time.
For example:

table
   .window(Tumble over 5.milli on 'rowtime as 'w)
   .groupBy('a, 'w)
   .select(...)

defines a tumbling event-time window.

Processing-time is indicated by omitting a time attribute
(table.window(Tumble over 5.milli as 'w) ).

3) How can we do that in SQL?

In SQL we cannot add special expressions without touching the parser
which
we don't want to do because we want to stick to the SQL standard.
Therefore, I see only two options: adding system attributes or
(parameterless) built-in functions. I list some pros and cons of the
approaches below:

1. System Attributes:
+ most natural way to access a property of a record.
+ works with joins, because time attributes can be related to tables
- We need to ensure the attributes are not writable and always present
in
streaming tables (i.e., they should be system defined attributes).
- Need to adapt existing Table API expressions (will not change the API
but some parts of the internal translation)
- Event time value must be set when the stream is converted, processing
time is evaluated on the fly

2. Built-in Functions
+ Users could try to modify time attributes which is not possible with
functions
- do not work with joins, because we need to address different
relations
- not a natural way to access a property of a record

I think the only viable choice are system attributes, because built-in
functions cannot be used for joins.
However, system attributes are the more complex solution because they
need
a better integration with Calcite's SQL validator (preventing user
attributes which are named rowtime for instance).

Since there are currently a several contributions on the way (such as
SQL
OVER windows FLINK-5653 to FLINK-5658) that need time indicators, we
need a
solution soon to be able to make progress.
There are two PRs, #3252 and #3271, which implement the built-in marker
functions proctime() and rowtime() and which could serve as a temporary
solution (since we do not work on joins yet).
I would like to suggest to use these functions as a starting point
(once
the PRs are merged) and later change to the system attribute solution
which
needs a bit more time to be implemented.

I talked with Timo today about this issue and he said he would like to
investigate how we can implement this as system functions properly
integrated with Calcite and the SQL Validator.

What do others think?

Best, Fabian


Reply via email to