[ https://issues.apache.org/jira/browse/FLINK-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16121855#comment-16121855 ]
Xingcan Cui commented on FLINK-7245: ------------------------------------ Hi [~fhueske], I've got some new ideas about the rowtime/watermark. Currently in an operator with two inputs, the watermarks from different streams are merged in advance and only lower ones can be reserved. For example, given two streams {{S1}} and {{S2}}, if {{S1}} is generated in real-time while {{S2}} gets a two hours delay from now, watermarks from {{S1}} will be totally discarded since they are always higher than those of {{S2}}. Maybe we could make watermarks from different streams distinguishable and apply/emit them all. Specifically, I think we could add an extra field, which indicates the corresponding rowtime field of a row, in the {{Watermark}} class. For a single operator, there could be at most {{n}} (where {{n}} is equal to the number of inputs) rowtime fields activated (that may be deduced from the query conditions) and only watermarks corresponding to those fields will be *held back/applied* in the operator. All the watermarks should be emitted to downstream operators. As a consequence, all the timestamps (which are stored in rows) and watermarks are reserved and users can operate on different rowtime fields in different levels of a nested query. As a running example, consider the following query. {code:sql} SELECT COUNT(S1.a) OVER (PARTITION BY S1.key ORDER BY S2.rowtime RANGE BETWEEN 2 PRECEDING AND CURRENT ROW) FROM (SELECT * FROM S1, S2 WHERE S1.key = S2.key AND S1.rowtime>=S2.rowtime - 10 SECONDS and S1.rowtime<S2.rowtime + 10 SECONDS) {code} We can replace the {{ORDER BY S2.rowtime}} to {{ORDER BY S1.rowtime}} since both their timestamps and watermarks are reserved. What do you think? > Enhance the operators to support holding back watermarks > -------------------------------------------------------- > > Key: FLINK-7245 > URL: https://issues.apache.org/jira/browse/FLINK-7245 > Project: Flink > Issue Type: New Feature > Components: DataStream API > Reporter: Xingcan Cui > Assignee: Xingcan Cui > > Currently the watermarks are applied and emitted by the > {{AbstractStreamOperator}} instantly. > {code:java} > public void processWatermark(Watermark mark) throws Exception { > if (timeServiceManager != null) { > timeServiceManager.advanceWatermark(mark); > } > output.emitWatermark(mark); > } > {code} > Some calculation results (with timestamp fields) triggered by these > watermarks (e.g., join or aggregate results) may be regarded as delayed by > the downstream operators since their timestamps must be less than or equal to > the corresponding triggers. > This issue aims to add another "working mode", which supports holding back > watermarks, to current operators. These watermarks should be blocked and > stored by the operators until all the corresponding new generated results are > emitted. -- This message was sent by Atlassian JIRA (v6.4.14#64029)