Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6132#discussion_r199315500 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -984,6 +985,15 @@ private void startCheckpointScheduler(final CheckpointCoordinator checkpointCoor operatorBackPressureStats.orElse(null))); } + @Override + public void taskManagerTerminated(ResourceID resourceID, Set<AllocationID> allocationIds, Exception cause) { --- End diff -- My previous thought was that `RM` needed to notify the `allocationIds` that was assigned to `JM`, because it was possible that `SlotManager` had already assigned slots to `JM`, but `TM` was killed before `JM` established a connection. Mainly to address the issue in https://issues.apache.org/jira/browse/FLINK-9351, but with the current approach you suggested I think the problem in Flink-9351 has been fixed by the way.
---