[ https://issues.apache.org/jira/browse/FLINK-22326?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17360738#comment-17360738 ]
Dawid Wysakowicz commented on FLINK-22326: ------------------------------------------ Hey [~qqibrow], First of all, I'd like to emphasize that we highly discourage using the current implementation of iteration in DataStream along with checkpoints. As you already pointed out you might face data loss, because the feedback in-memory channel is not checkpointed. This contradicts the idea of checkpoints which are a way to guard against data loss. Apart from that problem, there is also the problem that you might end up in a deadlock caused by a backpressure between the tail and head of the iteration. We feel that making a somewhat broken feature, slightly less broken is not worth giving it a high priority. Sorry if I disappoint you here. As for your suggested fix. I think using the {{yield}} method is not a good choice there, because it will force waiting on a mail in the mailbox, which does not need to arrive. We would need to us a {{tryYield}} there, but that would effectively change the solution into a busy wait. A better solution would be to replace the {{BlockingQueue}} with a queue that can notify us if it has a new record via a {{Future}}. We could then {{suspend}} the default action of the mailbox and resume it once there is a record in the queue/when the {{Future}} finishes. Similarly as we do in the {{StreamTask}} if the {{InputProcessor}} returns {{InputStatus#NOTHING_AVAILABLE}}. We could reuse e.g. the {{FutureCompletingBlockingQueue}}, which we would need to move to a different module. This involves quite some changes in the code base, so given the priority, I described in the first paragraph I am not sure how much capacity we can spend on reviewing such an improvement. > Job contains Iterate Operator always fails on Checkpoint > --------------------------------------------------------- > > Key: FLINK-22326 > URL: https://issues.apache.org/jira/browse/FLINK-22326 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing > Affects Versions: 1.11.1 > Reporter: Lu Niu > Assignee: Lu Niu > Priority: Major > Labels: pull-request-available > Attachments: Screen Shot 2021-04-16 at 12.40.34 PM.png, Screen Shot > 2021-04-16 at 12.43.38 PM.png > > > Job contains Iterate Operator will always fail on checkpoint. > How to reproduce: > [https://gist.github.com/qqibrow/f297babadb0bb662ee398b9088870785] > this is based on > [https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java,] > but a few line difference: > 1. Make maxWaitTime large enough when create IterativeStream > 2. No output back to Itertive Source > Result: > The same code is able to checkpoint in 1.9.1 > !Screen Shot 2021-04-16 at 12.43.38 PM.png! > > but always fail on checkpoint in 1.11 > !Screen Shot 2021-04-16 at 12.40.34 PM.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)