rkhachatryan commented on code in PR #22584: URL: https://github.com/apache/flink/pull/22584#discussion_r1193908276
########## flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java: ########## @@ -1029,6 +1192,15 @@ private JobVertex createJobVertex( return jobVertex; } + private List<TaskStateSnapshot> getRescalingDescriptorsFromVertex( Review Comment: `getTaskStateSnapshotFromVertex`? ########## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java: ########## @@ -136,19 +136,24 @@ public void assignStates() { // repartition state for (TaskStateAssignment stateAssignment : vertexAssignments.values()) { - if (stateAssignment.hasNonFinishedState) { + if (stateAssignment.hasNonFinishedState + // FLINK-31963: We need to run repartitioning for stateless operators that have + // upstream output or downstream input states. + || stateAssignment.hasUpstreamOutputStates() + || stateAssignment.hasDownstreamInputStates()) { assignAttemptState(stateAssignment); } } // actually assign the state for (TaskStateAssignment stateAssignment : vertexAssignments.values()) { - // If upstream has output states, even the empty task state should be assigned for the - // current task in order to notify this task that the old states will send to it which - // likely should be filtered. + // If upstream has output states or downstream has input states, even the empty task + // state should be assigned for the current task in order to notify this task that the + // old states will send to it which likely should be filtered. if (stateAssignment.hasNonFinishedState || stateAssignment.isFullyFinished - || stateAssignment.hasUpstreamOutputStates()) { + || stateAssignment.hasUpstreamOutputStates() + || stateAssignment.hasDownstreamInputStates()) { Review Comment: Should we compute once `hasUpstreamOutputStates` and `hasDownstreamInputStates` similar to `isFullyFinished`? (that would at least be consistent, and also faster) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org