Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5931 I looked into the problem @GJL reported and I think it is not caused by killed TMs which don't have the chance to tell the JMs about the slot allocations (even though this can theoretically happen). In such a case, the protocol currently relies on the slot request timeout on the JM side to resend new requests. I think the actual problem is related to #5980. The problem there is that the `ExecutionGraph` does not wait until all of its slots have been properly returned to the `SlotPool` before it is restarted in case of a recovery. Due to this, it might happen that some of the old tasks occupy some slots which are actually needed for the new tasks. If this happens, the actual task to slot assignment might be suboptimal meaning that the tasks are spread across more slots than needed. For example, assume that we have two slots with a mapper and sink task: `S1: M1_old, S1_old` `S2: M2_old, S2_old` Now a failover happens and the system restarts the `ExecutionGraph`. When this happens `M1_old` and `S2_old` have not been properly released. `S1: M1_old` `S2: S2_old` Now we try to schedule the new tasks which suddenly needs 3 slots. `S1: M1_old, S1_new` `S2: M2_new, S2_old` `S3: M1_new, S2_new` After the old tasks have been released it would like that: `S1: S1_new` `S2: M2_new` `S3:M1_new, S2_new` With @GJL tests we had the situation that we could only allocate one additional container due to resource limitations. Thus, if we actually needed 2 additional container, the program could not start. By properly waiting for the slot release such a situation should no longer happen. However, #5980 does not solve the problem of early killed TMs which could not communicate with the JM. At the moment we would have to rely on the slot request timeouts to resolve this situation.
---