Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/896#discussion_r34272101 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -306,99 +308,100 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging { if (!isConnected) { log.debug(s"Dropping message $message because the TaskManager is currently " + "not connected to a JobManager.") - } + } else { - // we order the messages by frequency, to make sure the code paths for matching - // are as short as possible - message match { + // we order the messages by frequency, to make sure the code paths for matching + // are as short as possible + message match { + + // tell the task about the availability of a new input partition + case UpdateTaskSinglePartitionInfo(executionID, resultID, partitionInfo) => + updateTaskInputPartitions(executionID, List((resultID, partitionInfo))) + + // tell the task about the availability of some new input partitions + case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) => + updateTaskInputPartitions(executionID, partitionInfos) + + // discards intermediate result partitions of a task execution on this TaskManager + case FailIntermediateResultPartitions(executionID) => + log.info("Discarding the results produced by task execution " + executionID) + if (network.isAssociated) { + try { + network.getPartitionManager.releasePartitionsProducedBy(executionID) + } catch { + case t: Throwable => killTaskManagerFatal( + "Fatal leak: Unable to release intermediate result partition data", t) + } + } - // tell the task about the availability of a new input partition - case UpdateTaskSinglePartitionInfo(executionID, resultID, partitionInfo) => - updateTaskInputPartitions(executionID, List((resultID, partitionInfo))) + // notifies the TaskManager that the state of a task has changed. + // the TaskManager informs the JobManager and cleans up in case the transition + // was into a terminal state, or in case the JobManager cannot be informed of the + // state transition - // tell the task about the availability of some new input partitions - case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) => - updateTaskInputPartitions(executionID, partitionInfos) + case updateMsg@UpdateTaskExecutionState(taskExecutionState: TaskExecutionState) => - // discards intermediate result partitions of a task execution on this TaskManager - case FailIntermediateResultPartitions(executionID) => - log.info("Discarding the results produced by task execution " + executionID) - if (network.isAssociated) { - try { - network.getPartitionManager.releasePartitionsProducedBy(executionID) - } catch { - case t: Throwable => killTaskManagerFatal( - "Fatal leak: Unable to release intermediate result partition data", t) - } - } + // we receive these from our tasks and forward them to the JobManager --- End diff -- Here is a lot of changed code that was seemingly edited without need (has nothing to do with the accumulators). Since that is pretty sensitive code, I feel very hesitant to commit these massive edits. What was the reason for these changes in the first place?
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---