[ 
https://issues.apache.org/jira/browse/FLINK-28975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17598346#comment-17598346
 ] 

Piotr Nowojski commented on FLINK-28975:
----------------------------------------

{quote}WatermarksWithIdleness (which is an impl of WatermarkGenerator) 
periodically checks if there's any invocations of onEvent, and mark the 
watermark output as idle if the onEvent is never called with in the idleness 
timeout.{quote}

Wouldn't it be more correct to not call {{output.markIdle();}} if previous 
{{onPeriodicEmit}} call has already called (i.e {{idlenessTimer.checkIfIdle()}} 
returned true in the previous call)?

{quote}A possible solution in my mind is like we add another layer on 
WatermarkOutput of source that only mark idle if both main and per-split output 
are idle. WDYT?{quote}
I think that sounds about right.



> withIdleness marks all streams from FLIP-27 sources as idle
> -----------------------------------------------------------
>
>                 Key: FLINK-28975
>                 URL: https://issues.apache.org/jira/browse/FLINK-28975
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.15.1
>            Reporter: David Anderson
>            Assignee: Qingsheng Ren
>            Priority: Blocker
>             Fix For: 1.16.0, 1.15.3
>
>
> Using withIdleness with a FLIP-27 source leads to all of the streams from the 
> source being marked idle, which in turn leads to incorrect results, e.g., 
> from joins that rely on watermarks.
> Quoting from the user ML thread:
> In org.apache.flink.streaming.api.operators.SourceOperator, there are 
> separate instances of WatermarksWithIdleness created for each split output 
> and the main output. There is multiplexing of watermarks between split 
> outputs but no multiplexing between split output and main output.
>  
> For a source such as org.apache.flink.connector.kafka.source.KafkaSource, 
> {color:#353833}there is only output from splits and no output from main. 
> Hence the main output will (after an initial timeout) be marked as 
> idle.{color}
> {color:#353833} {color}
> {color:#353833}The implementation of {color}WatermarksWithIdleness is such 
> that once an output is idle, it will periodically re-mark the output as idle. 
> Since there is no multiplexing between split outputs and main output, the 
> idle marks coming from main output will repeatedly set the output to idle 
> even though there are events from the splits. Result is that the entire 
> source is repeatedly marked as idle.
> See this ML thread for more details: 
> [https://lists.apache.org/thread/bbokccohs16tzkdtybqtv1vx76gqkqj4]
> This probably affects older versions of Flink as well, but that needs to be 
> verified.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to