[ https://issues.apache.org/jira/browse/FLINK-23772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17402052#comment-17402052 ]
Yun Gao commented on FLINK-23772: --------------------------------- [~pnowojski] and I have some offline discussion on this issue: Currently we could not have the case that the tasks are partially finished but the operators got FullyFinishedOperatorState. With the current implementation, for the first run, # If a task have called operators' finishe() method, but not get to FINISHED yet, we could still snapshot the state of all the operators. In this case the checkpoint would contains the normal snapshot of the subtasks and would not contains the finished flags. # If a task has been FINISHED in the JM side, we could not take a snapshot of the operator subtasks it contains, # # thus we would mark these subtask as finished, and the operator would be partially finished. # If all the tasks has been FINISHED in the JM side, the containing operators would be marked as fully finished. Thus as a whole, the operators chained together would always be in the same states. Thus this issue should not exist. By the way, after the job restarted, we would not allows user to modify the topology if there are finished state. Currently we enforce the rule by checking that 1. The operators of the same vertex has the same state. 2. The predecessors of a fully finished vertex must also be fully finished. 3. The predecessors of a partly finished vertex connected via ALL_TO_ALL edges must be fully finished if the vertex is partly finished. 4. The predecessors of a partly finished vertex connected via POINTWISE edges must be partly finished or fully finished if the vertex is partly finished. > Double check if non-keyed FullyFinishedOperatorState can be mixed up with non > finished OperatorState on recovery > ---------------------------------------------------------------------------------------------------------------- > > Key: FLINK-23772 > URL: https://issues.apache.org/jira/browse/FLINK-23772 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Checkpointing > Affects Versions: 1.14.0 > Reporter: Piotr Nowojski > Priority: Critical > Fix For: 1.14.0 > > > I'm not sure if with non-keyed state we have an issue that it can be > reshuffled to different operators during recovery. Are there any guarantees > that if subtask 1 has state A, while subtask 2 has B, that after recovery it > won’t be rotated? > # is this an issue? > # if so, if we have partially finished tasks with some operators having, > {{FullyFinishedOperatorState}}, what prevents > {{VerticesFinishedCache.calculateIfFinished}} from failing if the > {{FullyFinishedOperatorState}} gets assigned to an operator chain with non > finished operator? > For example an operator chain with parallelism of two, non-keyed, before > recovery: > {noformat} > src1 (finished state) -> foo1 (finished state) > src2 -> foo2 > {noformat} > Can we end up after recovery with: > {noformat} > src1 (finished state) -> foo2 > src2 -> foo1 (finished state) > {noformat} > ? -- This message was sent by Atlassian Jira (v8.3.4#803005)