rkhachatryan commented on code in PR #22584:
URL: https://github.com/apache/flink/pull/22584#discussion_r1194862690


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java:
##########
@@ -136,19 +136,24 @@ public void assignStates() {
 
         // repartition state
         for (TaskStateAssignment stateAssignment : vertexAssignments.values()) 
{
-            if (stateAssignment.hasNonFinishedState) {
+            if (stateAssignment.hasNonFinishedState
+                    // FLINK-31963: We need to run repartitioning for 
stateless operators that have
+                    // upstream output or downstream input states.
+                    || stateAssignment.hasUpstreamOutputStates()
+                    || stateAssignment.hasDownstreamInputStates()) {
                 assignAttemptState(stateAssignment);
             }
         }
 
         // actually assign the state
         for (TaskStateAssignment stateAssignment : vertexAssignments.values()) 
{
-            // If upstream has output states, even the empty task state should 
be assigned for the
-            // current task in order to notify this task that the old states 
will send to it which
-            // likely should be filtered.
+            // If upstream has output states or downstream has input states, 
even the empty task
+            // state should be assigned for the current task in order to 
notify this task that the
+            // old states will send to it which likely should be filtered.
             if (stateAssignment.hasNonFinishedState
                     || stateAssignment.isFullyFinished
-                    || stateAssignment.hasUpstreamOutputStates()) {
+                    || stateAssignment.hasUpstreamOutputStates()
+                    || stateAssignment.hasDownstreamInputStates()) {

Review Comment:
   I think it is, it's still an iteration over all upstream subtasks for every 
downstream subtask in the worst case, and it would be more consistent with the 
other checks. 
   But I'm also fine with the current way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to