[ https://issues.apache.org/jira/browse/FLINK-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16115441#comment-16115441 ]
Xingcan Cui commented on FLINK-7245: ------------------------------------ Hi [~fhueske], thanks for your responses. They are really helpful. I'll try to rephrase the solution to see if it's accurate now. Whenever the new operator receives a watermark, it informs the internal timer service and then emits the watermark with a delay {{d}} added. The {{d}} could be either static or dynamic. Commonly, {{d}} is decided by the lowest future timestamp in the states of the user-defined functions. Therefore, we need a new API to report that. Also the lowest timestamp should be snapshotted. I've got three extra questions. # I think there are two approaches for the UDFs to report the lowest future timestamps to the operator. (1) Add a {{setFutureTimestamp()}} to the {{Context}} of the function. (2) Add a new method {{getFutureTimestamp()}} (or as a new interface) for particular functions. Which one do you prefer? # If the second approach above is chosen, there is no need to snapshot the lowest future timestamps, right? # Suppose the {{d}} values are different for keygroups or UDF instances, how can we coordinate them (i.e., find the global {{d}} for the operator)? I just wonder if we can take all the operator instances isomorphic, i.e., for a dedicated watermark, an identical {{d}} should be reported by different keygroups or operator instances. Do you think that makes sense? Best, Xingcan > Enhance the operators to support holding back watermarks > -------------------------------------------------------- > > Key: FLINK-7245 > URL: https://issues.apache.org/jira/browse/FLINK-7245 > Project: Flink > Issue Type: New Feature > Components: DataStream API > Reporter: Xingcan Cui > Assignee: Xingcan Cui > > Currently the watermarks are applied and emitted by the > {{AbstractStreamOperator}} instantly. > {code:java} > public void processWatermark(Watermark mark) throws Exception { > if (timeServiceManager != null) { > timeServiceManager.advanceWatermark(mark); > } > output.emitWatermark(mark); > } > {code} > Some calculation results (with timestamp fields) triggered by these > watermarks (e.g., join or aggregate results) may be regarded as delayed by > the downstream operators since their timestamps must be less than or equal to > the corresponding triggers. > This issue aims to add another "working mode", which supports holding back > watermarks, to current operators. These watermarks should be blocked and > stored by the operators until all the corresponding new generated results are > emitted. -- This message was sent by Atlassian JIRA (v6.4.14#64029)