Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6132#discussion_r198912464 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java --- @@ -717,6 +728,32 @@ private void allocateSlot(TaskManagerSlot taskManagerSlot, PendingSlotRequest pe mainThreadExecutor); } + public void notifyTaskManagerFailed(ResourceID resourceID, InstanceID instanceID, Exception cause) { + final TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceID); + if (taskManagerRegistration != null) { + final HashMap<JobID, Set<AllocationID>> jobAndAllocationIDMap = new HashMap<>(4); + for (SlotID slotID : taskManagerRegistration.getSlots()) { + TaskManagerSlot taskManagerSlot = slots.get(slotID); + AllocationID allocationID = taskManagerSlot.getAllocationId(); + if (allocationID != null) { + JobID jobId = taskManagerSlot.getJobId(); + Set<AllocationID> jobAllocationIDSet = jobAndAllocationIDMap.get(jobId); + if (jobAllocationIDSet == null) { + jobAllocationIDSet = new HashSet<>(2); + jobAndAllocationIDMap.put(jobId, jobAllocationIDSet); + } + jobAllocationIDSet.add(allocationID); + } + } + + for (Map.Entry<JobID, Set<AllocationID>> entry : jobAndAllocationIDMap.entrySet()) { + resourceActions.notifyTaskManagerTerminated(entry.getKey(), resourceID, entry.getValue(), cause); + } + } else { + LOG.warn("TaskManager failed before registering with slot manager successfully."); + } --- End diff -- This looks a little bit complicated. Moreover, I don't really like that the control flow is: ResourceManager -> SlotManager -> ResourceManager -> JobManager. What about leveraging the existing `ResourceAction#notifyAllocationFailure` method. We could say that we not only call this method in case of a failed pending slot request but also if we remove a slot. Then unregistering a `TaskManager` from the `SlotManager` would remove the slots which then would trigger for each allocated slot the `notifyAllocationFailure` message. We would then have to introduce a `JobMasterGateway#notifyAllocationFailure` which we can call from `ResourceActionsImpl#notifyAllocationFailure`. The implementation on the `JobMaster` side would then simply call `SlotPool#failAllocation`. By doing it that way, we send multiple messages (might not be ideal) but we reuse most of the existing code paths without introducing special case logic. What do you think?
---