[ 
https://issues.apache.org/jira/browse/FLINK-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16103282#comment-16103282
 ] 

Xingcan Cui commented on FLINK-7245:
------------------------------------

Hi [~fhueske], it took me a little time to comprehend how the rowtime works in 
current Table/SQL API. To continue the work, I'd like to share more of my 
understandings and questions that may be a little *detailed*. I wonder if you 
could help confirm or answer them.
Suppose there's a class {{Order(a:Long, b:String)}}.
# When registering a rowtime with the API, e.g., 
{{tEnv.registerDataStream("OrderA", orderA, 'a.rowtime, 'b)}}, I think the 
current logic should be that field {{a}} is shaded in the physical schema and 
an extra indicator about the rowtime field is added to the logical schema. I 
find the following snippet in {{StreamTableEnvironment.extractRowtime()}}.
{code:java}
if (mappedIdx < 0) {
         throw new TableException(
            s"The rowtime attribute can only replace a valid field. " +
            s"${origName.getOrElse(name)} is not a field of type $streamType.")
}
{code}
However, when I tried {{tEnv.registerDataStream("OrderA", orderA, 'a, 'b, 
'c.rowtime)}}, it can also be successfully registered with a field {{c}} added. 
I know whether allowing the extra field or not both make sense, but is still 
confused about that.
# When translating a SQL, the rowtime field is omitted by the initial 
"{{Order}} to {{CRow}} operator".
# The planner checks if the rowtime field will be used in a SQL. If the result 
turns to be true, this special field will be set with the {{ctx.timestamp()}} 
method in the following operator with a generated function.
#  The user should manually assign watermarks before registering the 
datastream. Now that the rowtime field will be taken as a common field, shall 
we consider adding a configurable {{DefaultWatermarkAssigner}} if it is not 
provided?

Besides, I found a minor issue in the SQL.html document. It uses an identical 
name "rowtime" for the field ( {{tableEnv.registerDataStream("Orders", ds, 
"user, product, amount, proctime.proctime, rowtime.rowtime")}}). Readers may be 
confused whether they should use the "field name" or the "rowtime" keyword in 
the SQL.

Thanks, Xingcan

> Enhance the operators to support holding back watermarks
> --------------------------------------------------------
>
>                 Key: FLINK-7245
>                 URL: https://issues.apache.org/jira/browse/FLINK-7245
>             Project: Flink
>          Issue Type: New Feature
>          Components: DataStream API
>            Reporter: Xingcan Cui
>            Assignee: Xingcan Cui
>
> Currently the watermarks are applied and emitted by the 
> {{AbstractStreamOperator}} instantly. 
> {code:java}
> public void processWatermark(Watermark mark) throws Exception {
>       if (timeServiceManager != null) {
>               timeServiceManager.advanceWatermark(mark);
>       }
>       output.emitWatermark(mark);
> }
> {code}
> Some calculation results (with timestamp fields) triggered by these 
> watermarks (e.g., join or aggregate results) may be regarded as delayed by 
> the downstream operators since their timestamps must be less than or equal to 
> the corresponding triggers. 
> This issue aims to add another "working mode", which supports holding back 
> watermarks, to current operators. These watermarks should be blocked and 
> stored by the operators until all the corresponding new generated results are 
> emitted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to