[ https://issues.apache.org/jira/browse/FLINK-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16099844#comment-16099844 ]
Fabian Hueske commented on FLINK-7245: -------------------------------------- Hi [~xccui], just throwing in some of the ideas I had so far. Not sure if all of them make sense, but maybe they are helpful: - Implement the operator as an extension of the existing operators, i.e, we do not modify the existing lower level operators but introduce a new one (or maybe two, one for single input and one for dual input). - The operator needs a hook to report the lowest timestamp that will be emitted in the future. Since watermarks are global in Flink, this is an operator level information. - The lowest timestamp information must be checkpointed as operator state. Since, keys might be shuffled around when we restore a job from a safepoint, we need a consistent way to restore the information. We can use the {{OperatorStateStore.getUnionListState()}} state type, which would restore the the list of all lowest timestamps to all restored operator instances. Each instance would then use the lowest timestamp of the list. - Updating the lowest timestamp will require some care. Since rows are processed per key, we need to make sure that rows of new keys are correctly handled. In principle, we have to avoid that any row which was not late when it arrived should not be late when it leaves the operator. - We need a strategy when to emit watermarks. My first idea would be to emit a watermark with the lowest future timestamp -1 whenever, the operator receives a watermark. Looking forward to your design doc. Cheers, Fabian > Enhance the operators to support holding back watermarks > -------------------------------------------------------- > > Key: FLINK-7245 > URL: https://issues.apache.org/jira/browse/FLINK-7245 > Project: Flink > Issue Type: New Feature > Components: DataStream API > Reporter: Xingcan Cui > Assignee: Xingcan Cui > > Currently the watermarks are applied and emitted by the > {{AbstractStreamOperator}} instantly. > {code:java} > public void processWatermark(Watermark mark) throws Exception { > if (timeServiceManager != null) { > timeServiceManager.advanceWatermark(mark); > } > output.emitWatermark(mark); > } > {code} > Some calculation results (with timestamp fields) triggered by these > watermarks (e.g., join or aggregate results) may be regarded as delayed by > the downstream operators since their timestamps must be less than or equal to > the corresponding triggers. > This issue aims to add another "working mode", which supports holding back > watermarks, to current operators. These watermarks should be blocked and > stored by the operators until all the corresponding new generated results are > emitted. -- This message was sent by Atlassian JIRA (v6.4.14#64029)