[ 
https://issues.apache.org/jira/browse/FLINK-17075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17143818#comment-17143818
 ] 

Chesnay Schepler commented on FLINK-17075:
------------------------------------------

h3. Problem description:

The {{JobMaster}} keeps track of the state of all {{Executions}} belonging to a 
job. After deployment, this tracking relies on updates from the 
{{TaskExecutors}}, transmitted via dedicated RPC messages.
If one such message is lost then the tracked state may no longer match the 
actual one. In the worst case this prevents an {{Execution}} from ever reaching 
a terminal state, which in turn prevents the job from terminating.

h3. Proposed solution:

To prevent the worst case from happening, we propose that the {{TaskExecutor}} 
also submits a report of all currently deployed {{Tasks}} (identified by the 
{{ExecutionAttemptID}}) with each heartbeat. This allows us to detect 
discrepancies between the set of executions of the {{JobManager}} and 
{{TaskExecutor}}, and act accordingly.

This in the end boils down to a comparison of 2 {{Set<ExecutionAttemptID>}}.

If an execution exists only in the {{JobMaster}} set, then the execution was 
dropped by the {{TaskExecutor}}.
This could imply a loss of a terminal state transition.
We cannot determine which terminal state the task has reached, since all 
information was already cleaned up.
In this case we will fail the execution in the {{ExecutionGraph}}, typically 
resulting in a restart.

If an execution exists only in the {{TaskExecutor}} set, then some leftover 
task from a previous attempt is still running on the {{TaskExecutor}}.
In this case we will cancel the task on the {{TaskExecutor}}. Running jobs are 
unaffected.

If an execution exists in both sets, then we don't do anything.

h4. Required changes:

{{TaskExecutor}}
------------------------------

The existing {{TaskSlotTable}} supports iterating over all {{Tasks}} for a 
given {{JobID}}, allowing us to extract the {{ExecutionAttemptID}}.
>From this we generate a {{Set<ExecutionAttemptID>}}, and submit it via 
>heartbeats.


{{JobMaster}}
------------------------------

Here we need to be able to:
a) (un)track actually deployed {{Executions}}
c) cancel tasks on the {{TaskExecutor}}
d) fail tasks in the {{ExecutionGraph}}

These capabilities are split across 2 new components:
1) ExecutionDeploymentTracker
2) ExecutionDeploymentReconciler

1) The tracker lives in the Scheduler, with the following interface:

{code}
public interface ExecutionDeploymentTracker {
        void startTrackingDeployment(ExecutionAttemptID deployment);

        void stopTrackingDeployment(ExecutionAttemptID deployment);

        Set<ExecutionAttemptID> getExecutions();
{code}

It's basically a {{Set<ExecutionAttemptID>}}.

The tracker is notified by the {{ExecutionGraph}} of deployed/finished 
executions through 2 new listeners:

{code}
public interface ExecutionDeploymentListener {
        void onCompletedDeployment(ExecutionAttemptID execution);
}

public interface ExecutionStateUpdateListener {
        void onStateUpdate(ExecutionAttemptID execution, ExecutionState 
newState);
}
{code}

{{onCompletedDeployment}} is called in {{Execution#deploy}} when the deployment 
future completes; an implementation will initiate the tracking.
{{onStateUpdate}} is called in {{Execution#transitionState}} on any successful 
state transition, an implementation will stop the tracking if the new state is 
a terminal one.

Note: The deployment listener is required since there is no dedicated state for 
a deployed task;
executions are switched to DEPLOYING, submitted to the {{TaskExecutor}},
and switched to running after an update from the {{TaskExecutor}}.
Since this update can be lost we cannot rely on it.
A dedicated DEPLOYED state would be preferable, but this would require too many 
changes to the {{ExecutionGraph}} at this time.

2) The reconciler lives in the {{JobMaster}} and uses the IDs provided by the 
tracker and {{TaskExecutor}} heartbeats to detect mismatches, and fire events 
accordingly.
By defining a {{ReconciliationHandler}} the {{JobMaster}} can decide how each 
case should be handled:

{code}
public interface ExecutionDeploymentReconciler {

        // conceptual factory interface
        interface Factory {
                ExecutionDeploymentStateReconciler get(ReconciliationHandler 
trigger);
        }

        void reconcileExecutionStates(ResourceID origin, DeploymentReport 
deploymentReport, Set<ExecutionAttemptID> knownExecutionAttemptsIds);

        interface ExecutionIdsProvider {
                Set<ExecutionAttemptID> getExecutions();
        }

        interface ReconciliationHandler {
                // fail the execution in the ExecutionGraph
                void onMissingDeployment(ExecutionAttemptID deployment);

                // cancel the task on the TaskExecutor
                void onUnknownDeployment(ExecutionAttemptID deployment, 
ResourceID hostingTaskExecutor);
        }
{code}

> Add task status reconciliation between TM and JM
> ------------------------------------------------
>
>                 Key: FLINK-17075
>                 URL: https://issues.apache.org/jira/browse/FLINK-17075
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Coordination
>    Affects Versions: 1.10.0, 1.11.0
>            Reporter: Till Rohrmann
>            Assignee: Chesnay Schepler
>            Priority: Critical
>             Fix For: 1.10.2, 1.12.0, 1.11.1
>
>
> In order to harden the TM and JM communication I suggest to let the 
> {{TaskExecutor}} send the task statuses back to the {{JobMaster}} as part of 
> the heartbeat payload (similar to FLINK-11059). This would allow to reconcile 
> the states of both components in case that a status update message was lost 
> as described by a user on the ML.
> https://lists.apache.org/thread.html/ra9ed70866381f0ef0f4779633346722ccab3dc0d6dbacce04080b74e%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to