[ 
https://issues.apache.org/jira/browse/FLINK-28975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17598286#comment-17598286
 ] 

Qingsheng Ren commented on FLINK-28975:
---------------------------------------

[~pnowojski] There are two outputs in the source: per-split output and main 
output, and each of them has a copy of {{{}WatermarkGenerator{}}}:

[https://github.com/apache/flink/blob/c0f080762e7a1c1763942fed2e34420164a04bf3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/ProgressiveTimestampsAndWatermarks.java#L102-L120]

{{WatermarksWithIdleness}} (which is an impl of {{{}WatermarkGenerator{}}}) 
periodically checks if there's any invocations of {{{}onEvent{}}}, and mark the 
watermark output as idle if the {{onEvent}} is never called with in the 
idleness timeout. For the source only uses per-split output instead of main 
output (like KafkaSource), the {{WatermarksWithIdleness}} on the main output is 
never touched so it keeps marking the watermark output as idle in 
{{{}onPeriodicEmit{}}}.

I'm not sure adding the main output to the watermark multiplexer in per-split 
output is a correct way to fix. From the current logic of 
{{ProgressiveTimestampsAndWatermarks}} the watermark from the main output 
should be directly sent to downstream instead of calculating min with 
watermarks from per-split output.

A possible solution in my mind is like we add another layer on WatermarkOutput 
of source that only mark idle if both main and per-split output are idle. WDYT?

> withIdleness marks all streams from FLIP-27 sources as idle
> -----------------------------------------------------------
>
>                 Key: FLINK-28975
>                 URL: https://issues.apache.org/jira/browse/FLINK-28975
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.15.1
>            Reporter: David Anderson
>            Assignee: Qingsheng Ren
>            Priority: Blocker
>             Fix For: 1.16.0, 1.15.3
>
>
> Using withIdleness with a FLIP-27 source leads to all of the streams from the 
> source being marked idle, which in turn leads to incorrect results, e.g., 
> from joins that rely on watermarks.
> Quoting from the user ML thread:
> In org.apache.flink.streaming.api.operators.SourceOperator, there are 
> separate instances of WatermarksWithIdleness created for each split output 
> and the main output. There is multiplexing of watermarks between split 
> outputs but no multiplexing between split output and main output.
>  
> For a source such as org.apache.flink.connector.kafka.source.KafkaSource, 
> {color:#353833}there is only output from splits and no output from main. 
> Hence the main output will (after an initial timeout) be marked as 
> idle.{color}
> {color:#353833} {color}
> {color:#353833}The implementation of {color}WatermarksWithIdleness is such 
> that once an output is idle, it will periodically re-mark the output as idle. 
> Since there is no multiplexing between split outputs and main output, the 
> idle marks coming from main output will repeatedly set the output to idle 
> even though there are events from the splits. Result is that the entire 
> source is repeatedly marked as idle.
> See this ML thread for more details: 
> [https://lists.apache.org/thread/bbokccohs16tzkdtybqtv1vx76gqkqj4]
> This probably affects older versions of Flink as well, but that needs to be 
> verified.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to