AHeise commented on a change in pull request #13735: URL: https://github.com/apache/flink/pull/13735#discussion_r517337552
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java ########## @@ -99,32 +119,40 @@ public void assignStates() { operatorID, executionJobVertex.getParallelism(), executionJobVertex.getMaxParallelism()); - } else if (operatorState.getNumberCollectedStates() > 0) { - statelessSubTasks = false; } - operatorStates.add(operatorState); + operatorStates.put(operatorIDPair.getGeneratedOperatorID(), operatorState); } - if (!statelessSubTasks) { // skip tasks where no operator has any state - assignAttemptState(executionJobVertex, operatorStates); + + final TaskStateAssignment stateAssignment = new TaskStateAssignment(executionJobVertex, operatorStates); + vertexAssignments.put(executionJobVertex, stateAssignment); + for (final IntermediateResult producedDataSet : executionJobVertex.getInputs()) { + consumerAssignment.put(producedDataSet, stateAssignment); Review comment: `UnionGate` reads two or more different `IntermediateResult`. `IntermediateResult` translates directly into subpartition/gate. However, it's a good point that `IntermediateResult` doesn't override equals/hashCode. It currently works, because the JobGraph is a graph where the objects are connected, but I don't know if we can rely on the assumption. I'm switching to `IntermediateDataSetID`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org