[ https://issues.apache.org/jira/browse/FLINK-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16114167#comment-16114167 ]
Xingcan Cui commented on FLINK-7245: ------------------------------------ Hi all, I'd like to throw out some basic ideas about the design. # To support holding back watermarks, I plan to cache all the received watermarks as a priority queue in the {{InternalTimeServiceManager}} and expose some methods needed (e.g., the {{peek()}} and {{poll()}}). # For the {{advanceWatermark()}} method in {{InternalTimeServiceManager}}, I think we can add a boolean parameter to indicate whether the watermark should be cached. # A {{triggerWatermark()}} method, which can contain a default emitting mechanism (i.e., remove some watermarks from the cache and emit them) or be (partially) user-defined in the future, should be added to a new {{WatermarkPostponableOperator}}. # Now the {{processWatermark()}} method in {{AbstractStreamOperator}} can be overridden in the {{WatermarkPostponableOperator}}. # The watermarks can be snapshotted and restored with the {{snapshotStateForKeyGroup()}} and {{restoreStateForKeyGroup()}} methods in {{InternalTimeServiceManager}}. There's a question. For an operator with two inputs, the current {{AbstractStreamOperator}} deals with their watermarks by merging them in advance, i.e., {code:java} public void processWatermark1(Watermark mark) throws Exception { input1Watermark = mark.getTimestamp(); long newMin = Math.min(input1Watermark, input2Watermark); if (newMin > combinedWatermark) { combinedWatermark = newMin; processWatermark(new Watermark(combinedWatermark)); } } public void processWatermark2(Watermark mark) throws Exception { input2Watermark = mark.getTimestamp(); long newMin = Math.min(input1Watermark, input2Watermark); if (newMin > combinedWatermark) { combinedWatermark = newMin; processWatermark(new Watermark(combinedWatermark)); } } {code} I'm not sure if we should add two separate queues for them or just keep the current mechanism. What do you think? [~fhueske], [~aljoscha], and [~jark]. Best, Xingcan > Enhance the operators to support holding back watermarks > -------------------------------------------------------- > > Key: FLINK-7245 > URL: https://issues.apache.org/jira/browse/FLINK-7245 > Project: Flink > Issue Type: New Feature > Components: DataStream API > Reporter: Xingcan Cui > Assignee: Xingcan Cui > > Currently the watermarks are applied and emitted by the > {{AbstractStreamOperator}} instantly. > {code:java} > public void processWatermark(Watermark mark) throws Exception { > if (timeServiceManager != null) { > timeServiceManager.advanceWatermark(mark); > } > output.emitWatermark(mark); > } > {code} > Some calculation results (with timestamp fields) triggered by these > watermarks (e.g., join or aggregate results) may be regarded as delayed by > the downstream operators since their timestamps must be less than or equal to > the corresponding triggers. > This issue aims to add another "working mode", which supports holding back > watermarks, to current operators. These watermarks should be blocked and > stored by the operators until all the corresponding new generated results are > emitted. -- This message was sent by Atlassian JIRA (v6.4.14#64029)