[ 
https://issues.apache.org/jira/browse/FLINK-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16100161#comment-16100161
 ] 

Xingcan Cui commented on FLINK-7245:
------------------------------------

Thanks, [~fhueske]. The ideas are quite helpful. I'd like to put some 
thoughts/questions as well.
# At first, I hesitated whether to add switches to current operators or to 
introduce new operators. Both of them get their own advantages. I'll accept 
your suggestion since implementing them as extensions seems to be safer and 
also easy to maintain.
# Does the "hook to report..." you mentioned is used for monitoring?  
# I agree. Though currently the operator states can only be stored in JVM heap, 
that won't be a barrier since the watermark information should not be too large.
# The instances of an operator may be deployed in different physical machines. 
I read the code and find that the watermarks are broadcasted (with the 
{{recordWriter.broadcastEmit}} method) to downstream instances. I guess it 
means each operator instance can calculate their own lowest watermarks, 
separately. IMO, the global watermarks handled by different operator instances 
should be independent with the "keys". Could you explain a little more about 
the relationship between them?
# I think when to emit the watermarks should eventually be decided by the user 
function, which works like a  {{PunctuatedWatermarkAssigner}}. If I understood 
correctly, you meant to postpone the watermark by one and that can  solve the 
"trigger-and-generate" problem. However, till now I cannot think out whether it 
can deal with all the situations (I wish it could :)).


> Enhance the operators to support holding back watermarks
> --------------------------------------------------------
>
>                 Key: FLINK-7245
>                 URL: https://issues.apache.org/jira/browse/FLINK-7245
>             Project: Flink
>          Issue Type: New Feature
>          Components: DataStream API
>            Reporter: Xingcan Cui
>            Assignee: Xingcan Cui
>
> Currently the watermarks are applied and emitted by the 
> {{AbstractStreamOperator}} instantly. 
> {code:java}
> public void processWatermark(Watermark mark) throws Exception {
>       if (timeServiceManager != null) {
>               timeServiceManager.advanceWatermark(mark);
>       }
>       output.emitWatermark(mark);
> }
> {code}
> Some calculation results (with timestamp fields) triggered by these 
> watermarks (e.g., join or aggregate results) may be regarded as delayed by 
> the downstream operators since their timestamps must be less than or equal to 
> the corresponding triggers. 
> This issue aims to add another "working mode", which supports holding back 
> watermarks, to current operators. These watermarks should be blocked and 
> stored by the operators until all the corresponding new generated results are 
> emitted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to