[ https://issues.apache.org/jira/browse/FLINK-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16103282#comment-16103282 ]
Xingcan Cui commented on FLINK-7245: ------------------------------------ Hi [~fhueske], it took me a little time to comprehend how the rowtime works in current Table/SQL API. To continue the work, I'd like to share more of my understandings and questions that may be a little *detailed*. I wonder if you could help confirm or answer them. Suppose there's a class {{Order(a:Long, b:String)}}. # When registering a rowtime with the API, e.g., {{tEnv.registerDataStream("OrderA", orderA, 'a.rowtime, 'b)}}, I think the current logic should be that field {{a}} is shaded in the physical schema and an extra indicator about the rowtime field is added to the logical schema. I find the following snippet in {{StreamTableEnvironment.extractRowtime()}}. {code:java} if (mappedIdx < 0) { throw new TableException( s"The rowtime attribute can only replace a valid field. " + s"${origName.getOrElse(name)} is not a field of type $streamType.") } {code} However, when I tried {{tEnv.registerDataStream("OrderA", orderA, 'a, 'b, 'c.rowtime)}}, it can also be successfully registered with a field {{c}} added. I know whether allowing the extra field or not both make sense, but is still confused about that. # When translating a SQL, the rowtime field is omitted by the initial "{{Order}} to {{CRow}} operator". # The planner checks if the rowtime field will be used in a SQL. If the result turns to be true, this special field will be set with the {{ctx.timestamp()}} method in the following operator with a generated function. # The user should manually assign watermarks before registering the datastream. Now that the rowtime field will be taken as a common field, shall we consider adding a configurable {{DefaultWatermarkAssigner}} if it is not provided? Besides, I found a minor issue in the SQL.html document. It uses an identical name "rowtime" for the field ( {{tableEnv.registerDataStream("Orders", ds, "user, product, amount, proctime.proctime, rowtime.rowtime")}}). Readers may be confused whether they should use the "field name" or the "rowtime" keyword in the SQL. Thanks, Xingcan > Enhance the operators to support holding back watermarks > -------------------------------------------------------- > > Key: FLINK-7245 > URL: https://issues.apache.org/jira/browse/FLINK-7245 > Project: Flink > Issue Type: New Feature > Components: DataStream API > Reporter: Xingcan Cui > Assignee: Xingcan Cui > > Currently the watermarks are applied and emitted by the > {{AbstractStreamOperator}} instantly. > {code:java} > public void processWatermark(Watermark mark) throws Exception { > if (timeServiceManager != null) { > timeServiceManager.advanceWatermark(mark); > } > output.emitWatermark(mark); > } > {code} > Some calculation results (with timestamp fields) triggered by these > watermarks (e.g., join or aggregate results) may be regarded as delayed by > the downstream operators since their timestamps must be less than or equal to > the corresponding triggers. > This issue aims to add another "working mode", which supports holding back > watermarks, to current operators. These watermarks should be blocked and > stored by the operators until all the corresponding new generated results are > emitted. -- This message was sent by Atlassian JIRA (v6.4.14#64029)