[ https://issues.apache.org/jira/browse/FLINK-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16101562#comment-16101562 ]
Fabian Hueske commented on FLINK-7245: -------------------------------------- Great, thanks for the response [~xccui]! Ad 2) No, not really for mentoring. I envision the new operator as a skeleton that automatically takes care of delaying the watermarks. In order to do that, the custom user code would need to report the smallest timestamps which will be emitted in the future. I called the method to report these timestamps "hook", probably not the most appropriate term. Ad 3) Hmm, I was thinking about this a bit. What I said before about the {{OperatorStateStore.getUnionListState()}} would not work. We need to checkpoint the smallest future timestamp for each key. Since keys can be assigned to different operators in case of rescaling or recovery, we need to ensure that this information is kept together with the keys. However, operators have no access to keys but only to key groups (key groups are the unit of key distribution in Flink). Hence, we need to keep PriorityQueues for each key group and checkpoint those together with the keygroup. If a key group is moved to a different operator, the priority queue will be move there as well. I talked to [~aljoscha] and we can do this only on the lowest level of operator abstraction which is the {{AbstractStreamOperator}}. Have a look at the {{snapshotState()}} method which iterates over all key groups to snapshot timer information. I have to admit, I'm not familiar with the source code at this level. Ad 4) Each operator keeps track of all watermarks it receives from all input channels (i.e., each parallel instance of each input operator). For each input channel, it keeps maximum (i.e., latest received) watermark and computes its own watermark (which is emitted via all outgoing channels) as the minimum of those maximum input channel watermarks. So, each operator has only a single watermark (i.e., not a watermark per key). However, our custom code will have to operate on multiple keys and keys are strictly separated from each other such that no key is aware of the lowest timestamp of the others. Ad 5) That's a good point. We can make that configurable as well. I thought, emitting a watermark when a watermark is received would be a good default. Cheers, Fabian > Enhance the operators to support holding back watermarks > -------------------------------------------------------- > > Key: FLINK-7245 > URL: https://issues.apache.org/jira/browse/FLINK-7245 > Project: Flink > Issue Type: New Feature > Components: DataStream API > Reporter: Xingcan Cui > Assignee: Xingcan Cui > > Currently the watermarks are applied and emitted by the > {{AbstractStreamOperator}} instantly. > {code:java} > public void processWatermark(Watermark mark) throws Exception { > if (timeServiceManager != null) { > timeServiceManager.advanceWatermark(mark); > } > output.emitWatermark(mark); > } > {code} > Some calculation results (with timestamp fields) triggered by these > watermarks (e.g., join or aggregate results) may be regarded as delayed by > the downstream operators since their timestamps must be less than or equal to > the corresponding triggers. > This issue aims to add another "working mode", which supports holding back > watermarks, to current operators. These watermarks should be blocked and > stored by the operators until all the corresponding new generated results are > emitted. -- This message was sent by Atlassian JIRA (v6.4.14#64029)