[ 
https://issues.apache.org/jira/browse/FLINK-22326?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17360738#comment-17360738
 ] 

Dawid Wysakowicz commented on FLINK-22326:
------------------------------------------

Hey [~qqibrow],
First of all, I'd like to emphasize that we highly discourage using the current 
implementation of iteration in DataStream along with checkpoints. As you 
already pointed out you might face data loss, because the feedback in-memory 
channel is not checkpointed. This contradicts the idea of checkpoints which are 
a way to guard against data loss. Apart from that problem, there is also the 
problem that you might end up in a deadlock caused by a backpressure between 
the tail and head of the iteration. We feel that making a somewhat broken 
feature, slightly less broken is not worth giving it a high priority. Sorry if 
I disappoint you here. 

As for your suggested fix. I think using the {{yield}} method is not a good 
choice there, because it will force waiting on a mail in the mailbox, which 
does not need to arrive. We would need to us a {{tryYield}} there, but that 
would effectively change the solution into a busy wait. A better solution would 
be to replace the {{BlockingQueue}} with a queue that can notify us if it has a 
new record via a {{Future}}. We could then {{suspend}} the default action of 
the mailbox and resume it once there is a record in the queue/when the 
{{Future}} finishes. Similarly as we do in the {{StreamTask}} if the 
{{InputProcessor}} returns {{InputStatus#NOTHING_AVAILABLE}}. We could reuse 
e.g. the {{FutureCompletingBlockingQueue}}, which we would need to move to a 
different module. This involves quite some changes in the code base, so given 
the priority, I described in the first paragraph I am not sure how much 
capacity we can spend on reviewing such an improvement.

> Job contains Iterate Operator always fails on Checkpoint 
> ---------------------------------------------------------
>
>                 Key: FLINK-22326
>                 URL: https://issues.apache.org/jira/browse/FLINK-22326
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.11.1
>            Reporter: Lu Niu
>            Assignee: Lu Niu
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: Screen Shot 2021-04-16 at 12.40.34 PM.png, Screen Shot 
> 2021-04-16 at 12.43.38 PM.png
>
>
> Job contains Iterate Operator will always fail on checkpoint.
> How to reproduce: 
> [https://gist.github.com/qqibrow/f297babadb0bb662ee398b9088870785]
> this is based on 
> [https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java,]
>  but a few line difference:
>  1. Make maxWaitTime large enough when create IterativeStream
> 2. No output back to Itertive Source
> Result:
> The same code is able to checkpoint in 1.9.1
> !Screen Shot 2021-04-16 at 12.43.38 PM.png!
>  
> but always fail on checkpoint in 1.11
> !Screen Shot 2021-04-16 at 12.40.34 PM.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to