Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6132#discussion_r198491683 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -1120,5 +1131,14 @@ public void reportPayload(ResourceID resourceID, Void payload) { return CompletableFuture.completedFuture(null); } } + + protected void notifyTaskManagerCompleted(ResourceID resourceID, Exception cause) { + WorkerRegistration<WorkerType> workerRegistration = taskExecutors.remove(resourceID); + if (workerRegistration != null) { + slotManager.notifyTaskManagerFailed(resourceID, workerRegistration.getInstanceID(), cause); + } else { + log.warn("TaskManager failed before registering with ResourceManager successfully."); --- End diff -- This should be a debug log message.
---