[ https://issues.apache.org/jira/browse/FLINK-17075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17143818#comment-17143818 ]
Chesnay Schepler commented on FLINK-17075: ------------------------------------------ h3. Problem description: The {{JobMaster}} keeps track of the state of all {{Executions}} belonging to a job. After deployment, this tracking relies on updates from the {{TaskExecutors}}, transmitted via dedicated RPC messages. If one such message is lost then the tracked state may no longer match the actual one. In the worst case this prevents an {{Execution}} from ever reaching a terminal state, which in turn prevents the job from terminating. h3. Proposed solution: To prevent the worst case from happening, we propose that the {{TaskExecutor}} also submits a report of all currently deployed {{Tasks}} (identified by the {{ExecutionAttemptID}}) with each heartbeat. This allows us to detect discrepancies between the set of executions of the {{JobManager}} and {{TaskExecutor}}, and act accordingly. This in the end boils down to a comparison of 2 {{Set<ExecutionAttemptID>}}. If an execution exists only in the {{JobMaster}} set, then the execution was dropped by the {{TaskExecutor}}. This could imply a loss of a terminal state transition. We cannot determine which terminal state the task has reached, since all information was already cleaned up. In this case we will fail the execution in the {{ExecutionGraph}}, typically resulting in a restart. If an execution exists only in the {{TaskExecutor}} set, then some leftover task from a previous attempt is still running on the {{TaskExecutor}}. In this case we will cancel the task on the {{TaskExecutor}}. Running jobs are unaffected. If an execution exists in both sets, then we don't do anything. h4. Required changes: {{TaskExecutor}} ------------------------------ The existing {{TaskSlotTable}} supports iterating over all {{Tasks}} for a given {{JobID}}, allowing us to extract the {{ExecutionAttemptID}}. >From this we generate a {{Set<ExecutionAttemptID>}}, and submit it via >heartbeats. {{JobMaster}} ------------------------------ Here we need to be able to: a) (un)track actually deployed {{Executions}} c) cancel tasks on the {{TaskExecutor}} d) fail tasks in the {{ExecutionGraph}} These capabilities are split across 2 new components: 1) ExecutionDeploymentTracker 2) ExecutionDeploymentReconciler 1) The tracker lives in the Scheduler, with the following interface: {code} public interface ExecutionDeploymentTracker { void startTrackingDeployment(ExecutionAttemptID deployment); void stopTrackingDeployment(ExecutionAttemptID deployment); Set<ExecutionAttemptID> getExecutions(); {code} It's basically a {{Set<ExecutionAttemptID>}}. The tracker is notified by the {{ExecutionGraph}} of deployed/finished executions through 2 new listeners: {code} public interface ExecutionDeploymentListener { void onCompletedDeployment(ExecutionAttemptID execution); } public interface ExecutionStateUpdateListener { void onStateUpdate(ExecutionAttemptID execution, ExecutionState newState); } {code} {{onCompletedDeployment}} is called in {{Execution#deploy}} when the deployment future completes; an implementation will initiate the tracking. {{onStateUpdate}} is called in {{Execution#transitionState}} on any successful state transition, an implementation will stop the tracking if the new state is a terminal one. Note: The deployment listener is required since there is no dedicated state for a deployed task; executions are switched to DEPLOYING, submitted to the {{TaskExecutor}}, and switched to running after an update from the {{TaskExecutor}}. Since this update can be lost we cannot rely on it. A dedicated DEPLOYED state would be preferable, but this would require too many changes to the {{ExecutionGraph}} at this time. 2) The reconciler lives in the {{JobMaster}} and uses the IDs provided by the tracker and {{TaskExecutor}} heartbeats to detect mismatches, and fire events accordingly. By defining a {{ReconciliationHandler}} the {{JobMaster}} can decide how each case should be handled: {code} public interface ExecutionDeploymentReconciler { // conceptual factory interface interface Factory { ExecutionDeploymentStateReconciler get(ReconciliationHandler trigger); } void reconcileExecutionStates(ResourceID origin, DeploymentReport deploymentReport, Set<ExecutionAttemptID> knownExecutionAttemptsIds); interface ExecutionIdsProvider { Set<ExecutionAttemptID> getExecutions(); } interface ReconciliationHandler { // fail the execution in the ExecutionGraph void onMissingDeployment(ExecutionAttemptID deployment); // cancel the task on the TaskExecutor void onUnknownDeployment(ExecutionAttemptID deployment, ResourceID hostingTaskExecutor); } {code} > Add task status reconciliation between TM and JM > ------------------------------------------------ > > Key: FLINK-17075 > URL: https://issues.apache.org/jira/browse/FLINK-17075 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination > Affects Versions: 1.10.0, 1.11.0 > Reporter: Till Rohrmann > Assignee: Chesnay Schepler > Priority: Critical > Fix For: 1.10.2, 1.12.0, 1.11.1 > > > In order to harden the TM and JM communication I suggest to let the > {{TaskExecutor}} send the task statuses back to the {{JobMaster}} as part of > the heartbeat payload (similar to FLINK-11059). This would allow to reconcile > the states of both components in case that a status update message was lost > as described by a user on the ML. > https://lists.apache.org/thread.html/ra9ed70866381f0ef0f4779633346722ccab3dc0d6dbacce04080b74e%40%3Cuser.flink.apache.org%3E -- This message was sent by Atlassian Jira (v8.3.4#803005)