rkhachatryan commented on code in PR #22584: URL: https://github.com/apache/flink/pull/22584#discussion_r1194862690
########## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java: ########## @@ -136,19 +136,24 @@ public void assignStates() { // repartition state for (TaskStateAssignment stateAssignment : vertexAssignments.values()) { - if (stateAssignment.hasNonFinishedState) { + if (stateAssignment.hasNonFinishedState + // FLINK-31963: We need to run repartitioning for stateless operators that have + // upstream output or downstream input states. + || stateAssignment.hasUpstreamOutputStates() + || stateAssignment.hasDownstreamInputStates()) { assignAttemptState(stateAssignment); } } // actually assign the state for (TaskStateAssignment stateAssignment : vertexAssignments.values()) { - // If upstream has output states, even the empty task state should be assigned for the - // current task in order to notify this task that the old states will send to it which - // likely should be filtered. + // If upstream has output states or downstream has input states, even the empty task + // state should be assigned for the current task in order to notify this task that the + // old states will send to it which likely should be filtered. if (stateAssignment.hasNonFinishedState || stateAssignment.isFullyFinished - || stateAssignment.hasUpstreamOutputStates()) { + || stateAssignment.hasUpstreamOutputStates() + || stateAssignment.hasDownstreamInputStates()) { Review Comment: I think it is, it's still an iteration over all upstream subtasks for every downstream subtask in the worst case, and it would be more consistent with the other checks. But I'm also fine with the current way. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org