[ 
https://issues.apache.org/jira/browse/FLINK-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16101562#comment-16101562
 ] 

Fabian Hueske commented on FLINK-7245:
--------------------------------------

Great, thanks for the response [~xccui]!

Ad 2) No, not really for mentoring. I envision the new operator as a skeleton 
that automatically takes care of delaying the watermarks. In order to do that, 
the custom user code would need to report the smallest timestamps which will be 
emitted in the future. I called the method to report these timestamps "hook", 
probably not the most appropriate term.

Ad 3) Hmm, I was thinking about this a bit. What I said before about the 
{{OperatorStateStore.getUnionListState()}} would not work. We need to 
checkpoint the smallest future timestamp for each key. Since keys can be 
assigned to different operators in case of rescaling or recovery, we need to 
ensure that this information is kept together with the keys. However, operators 
have no access to keys but only to key groups (key groups are the unit of key 
distribution in Flink). Hence, we need to keep PriorityQueues for each key 
group and checkpoint those together with the keygroup. If a key group is moved 
to a different operator, the priority queue will be move there as well. I 
talked to [~aljoscha] and we can do this only on the lowest level of operator 
abstraction which is the {{AbstractStreamOperator}}. Have a look at the 
{{snapshotState()}} method which iterates over all key groups to snapshot timer 
information. I have to admit, I'm not familiar with the source code at this 
level.

Ad 4) Each operator keeps track of all watermarks it receives from all input 
channels (i.e., each parallel instance of each input operator). For each input 
channel, it keeps maximum (i.e., latest received) watermark and computes its 
own watermark (which is emitted via all outgoing channels) as the minimum of 
those maximum input channel watermarks. So, each operator has only a single 
watermark (i.e., not a watermark per key). However, our custom code will have 
to operate on multiple keys and keys are strictly separated from each other 
such that no key is aware of the lowest timestamp of the others.

Ad 5) That's a good point. We can make that configurable as well. I thought, 
emitting a watermark when a watermark is received would be a good default.

Cheers, Fabian

> Enhance the operators to support holding back watermarks
> --------------------------------------------------------
>
>                 Key: FLINK-7245
>                 URL: https://issues.apache.org/jira/browse/FLINK-7245
>             Project: Flink
>          Issue Type: New Feature
>          Components: DataStream API
>            Reporter: Xingcan Cui
>            Assignee: Xingcan Cui
>
> Currently the watermarks are applied and emitted by the 
> {{AbstractStreamOperator}} instantly. 
> {code:java}
> public void processWatermark(Watermark mark) throws Exception {
>       if (timeServiceManager != null) {
>               timeServiceManager.advanceWatermark(mark);
>       }
>       output.emitWatermark(mark);
> }
> {code}
> Some calculation results (with timestamp fields) triggered by these 
> watermarks (e.g., join or aggregate results) may be regarded as delayed by 
> the downstream operators since their timestamps must be less than or equal to 
> the corresponding triggers. 
> This issue aims to add another "working mode", which supports holding back 
> watermarks, to current operators. These watermarks should be blocked and 
> stored by the operators until all the corresponding new generated results are 
> emitted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to