[ https://issues.apache.org/jira/browse/FLINK-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16116601#comment-16116601 ]
Fabian Hueske commented on FLINK-7245: -------------------------------------- Hi [~xccui], I think this is a very good observation: bq. The delay could be either static or dynamic. My thinking was much to complicated. We can implement a windowed stream join with a static watermark delay which is defined by the window bounds (possibly adjusted by a late data interval) and does not need to materialize state. Basically, we can delay the watermark by the time at which we discard a row from the state. Once discarded, we cannot emit it anymore. So there is no immediate need to implement the more complex variant that keeps track of future record timestamps and checkpoints them. Regarding your questions: 1. Only the first one would be feasible. A parallel task instance processes handles one or more key groups, each with many keys. So we would need to iterate over all keys which would be very expensive. 2. Looking up the lowest ts for each key would mean to query the keys state. So, yes we would not need additional state, but query the existing but it would be expensive. 3. Watermarks are handled per task instance. So we would need to coordinate the watermark across all keys processed by the same instance (see 1. and 2.). There would be no need for global (distributed) coordination across task instances. I would suggest to restrict this JIRA to implement an operator with a constant watermark delay. When a watermark is received, the delay is subtracted and the resulting watermark is forwarded. This is will enable the implementation of a windowed stream join. What do you think [~xccui] > Enhance the operators to support holding back watermarks > -------------------------------------------------------- > > Key: FLINK-7245 > URL: https://issues.apache.org/jira/browse/FLINK-7245 > Project: Flink > Issue Type: New Feature > Components: DataStream API > Reporter: Xingcan Cui > Assignee: Xingcan Cui > > Currently the watermarks are applied and emitted by the > {{AbstractStreamOperator}} instantly. > {code:java} > public void processWatermark(Watermark mark) throws Exception { > if (timeServiceManager != null) { > timeServiceManager.advanceWatermark(mark); > } > output.emitWatermark(mark); > } > {code} > Some calculation results (with timestamp fields) triggered by these > watermarks (e.g., join or aggregate results) may be regarded as delayed by > the downstream operators since their timestamps must be less than or equal to > the corresponding triggers. > This issue aims to add another "working mode", which supports holding back > watermarks, to current operators. These watermarks should be blocked and > stored by the operators until all the corresponding new generated results are > emitted. -- This message was sent by Atlassian JIRA (v6.4.14#64029)