[ 
https://issues.apache.org/jira/browse/FLINK-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16114952#comment-16114952
 ] 

Fabian Hueske commented on FLINK-7245:
--------------------------------------

Thanks a lot for looking into this [~xccui]!

A few comments:
- Even though we are talking about "holding back watermarks", we don't need to 
cache them. It is not necessary to emit exactly the same watermarks as we 
receive. What I meant by "holding back watermarks" is that we cannot 
immediately forward watermarks (as done in 
{{AbstractStreamOperator.processWatermark()}} but instead emit a smaller 
watermark. The emitted watermark must be smaller than the lowest timestamp that 
will be emitted in the future.
- An operator must track the smallest timestamps that will be emitted in the 
future. The future timestamps are those of records which are hold in the state. 
In addition there is a bound which depends on the operator (think of it as a 
window bound), the current watermark, and possibly an additional bound to 
tolerate late data. Imagine you have a 1 hour tumbling window and you want to 
emit records with a timestamp of the first record in that window. In such a 
case, the bound would be the current watermark minus 1 hour (- 30 minutes if 
you accept 30 minute late data).
- Because the semantics of the operator are not know at the 
{{AbstractStreamOperator}} and will depend on user-defined code, we need an 
interface to report future timestamps and emitted timestamps to the operator 
that emits watermarks. All those timestamps must be checkpointed in their 
corresponding keygroup.

Best, Fabian

> Enhance the operators to support holding back watermarks
> --------------------------------------------------------
>
>                 Key: FLINK-7245
>                 URL: https://issues.apache.org/jira/browse/FLINK-7245
>             Project: Flink
>          Issue Type: New Feature
>          Components: DataStream API
>            Reporter: Xingcan Cui
>            Assignee: Xingcan Cui
>
> Currently the watermarks are applied and emitted by the 
> {{AbstractStreamOperator}} instantly. 
> {code:java}
> public void processWatermark(Watermark mark) throws Exception {
>       if (timeServiceManager != null) {
>               timeServiceManager.advanceWatermark(mark);
>       }
>       output.emitWatermark(mark);
> }
> {code}
> Some calculation results (with timestamp fields) triggered by these 
> watermarks (e.g., join or aggregate results) may be regarded as delayed by 
> the downstream operators since their timestamps must be less than or equal to 
> the corresponding triggers. 
> This issue aims to add another "working mode", which supports holding back 
> watermarks, to current operators. These watermarks should be blocked and 
> stored by the operators until all the corresponding new generated results are 
> emitted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to