[ https://issues.apache.org/jira/browse/FLINK-5867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15984131#comment-15984131 ]
ASF GitHub Bot commented on FLINK-5867: --------------------------------------- Github user shuai-xu commented on a diff in the pull request: https://github.com/apache/flink/pull/3773#discussion_r113361061 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java --- @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover; + +import org.apache.flink.runtime.concurrent.AcceptFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * FailoverRegion manages the failover of a minimal pipeline connected sub graph. + * It will change from CREATED to CANCELING and then to CANCELLED and at last to RUNNING, + */ +public class FailoverRegion { + + private static final AtomicReferenceFieldUpdater<FailoverRegion, JobStatus> STATE_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(FailoverRegion.class, JobStatus.class, "state"); + + /** The log object used for debugging. */ + private static final Logger LOG = LoggerFactory.getLogger(FailoverRegion.class); + + // ------------------------------------------------------------------------ + + /** a unique id for debugging */ + private final AbstractID id = new AbstractID(); + + private final ExecutionGraph executionGraph; + + private final List<ExecutionVertex> connectedExecutionVertexes; + + /** The executor that executes the recovery action after all vertices are in a */ + private final Executor executor; + + /** Current status of the job execution */ + private volatile JobStatus state = JobStatus.RUNNING; + + + public FailoverRegion(ExecutionGraph executionGraph, Executor executor, List<ExecutionVertex> connectedExecutions) { + this.executionGraph = checkNotNull(executionGraph); + this.executor = checkNotNull(executor); + this.connectedExecutionVertexes = checkNotNull(connectedExecutions); + + LOG.debug("Created failover region {} with vertices: {}", id, connectedExecutions); + } + + public void onExecutionFail(Execution taskExecution, Throwable cause) { + // TODO: check if need to failover the preceding region + if (!executionGraph.getRestartStrategy().canRestart()) { + // delegate the failure to a global fail that will check the restart strategy and not restart + executionGraph.failGlobal(cause); + } + else { + cancel(taskExecution.getGlobalModVersion()); + } + } + + private void allVerticesInTerminalState(long globalModVersionOfFailover) { + while (true) { + JobStatus curStatus = this.state; + if (curStatus.equals(JobStatus.CANCELLING)) { + if (transitionState(curStatus, JobStatus.CANCELED)) { + reset(globalModVersionOfFailover); + break; + } + } + else { + LOG.info("FailoverRegion {} is {} when allVerticesInTerminalState.", id, state); + break; + } + } + } + + public JobStatus getState() { + return state; + } + + /** + * get all execution vertexes contained in this region + */ + public List<ExecutionVertex> getAllExecutionVertexes() { + return connectedExecutionVertexes; + } + + // Notice the region to failover, + private void failover(long globalModVersionOfFailover) { + if (!executionGraph.getRestartStrategy().canRestart()) { + executionGraph.failGlobal(new FlinkException("RestartStrategy validate fail")); + } + else { + JobStatus curStatus = this.state; + if (curStatus.equals(JobStatus.RUNNING)) { + cancel(globalModVersionOfFailover); + } + else if (curStatus.equals(JobStatus.CANCELED)) { + reset(globalModVersionOfFailover); + } + else { + LOG.info("FailoverRegion {} is {} when notified to failover.", id, state); + } + } + } + + // cancel all executions in this sub graph + private void cancel(final long globalModVersionOfFailover) { + while (true) { + JobStatus curStatus = this.state; + if (curStatus.equals(JobStatus.RUNNING)) { + if (transitionState(curStatus, JobStatus.CANCELLING)) { + + // we build a future that is complete once all vertices have reached a terminal state + final ArrayList<Future<?>> futures = new ArrayList<>(connectedExecutionVertexes.size()); + + // cancel all tasks (that still need cancelling) + for (ExecutionVertex vertex : connectedExecutionVertexes) { + futures.add(vertex.cancel()); + } + + final FutureUtils.ConjunctFuture allTerminal = FutureUtils.combineAll(futures); + allTerminal.thenAcceptAsync(new AcceptFunction<Void>() { + @Override + public void accept(Void value) { + allVerticesInTerminalState(globalModVersionOfFailover); + } + }, executor); + + break; + } + } + else { + LOG.info("FailoverRegion {} is {} when cancel.", id, state); + break; + } + } + } + + // reset all executions in this sub graph + private void reset(long globalModVersionOfFailover) { + try { + // reset all connected ExecutionVertexes + final Collection<CoLocationGroup> colGroups = new HashSet<>(); + final long restartTimestamp = System.currentTimeMillis(); + + for (ExecutionVertex ev : connectedExecutionVertexes) { + CoLocationGroup cgroup = ev.getJobVertex().getCoLocationGroup(); + if (cgroup != null && !colGroups.contains(cgroup)){ + cgroup.resetConstraints(); + colGroups.add(cgroup); + } + + ev.resetForNewExecution(restartTimestamp, globalModVersionOfFailover); + } + if (transitionState(JobStatus.CANCELED, JobStatus.CREATED)) { + restart(globalModVersionOfFailover); + } + else { + LOG.info("FailoverRegion {} switched from CANCELLING to CREATED fail, will fail this region again.", id); + failover(globalModVersionOfFailover); + } + } + catch (GlobalModVersionMismatch e) { + // happens when a global recovery happens concurrently to the regional recovery + // go back to a clean state + state = JobStatus.CREATED; --- End diff -- The initial state of FailoverRegion should be RUNNING, as when FailoverRegion is working, the exeuctions in it will always be scheduled already. The CREATED state is only used during the failover process. > The implementation of RestartPipelinedRegionStrategy > ---------------------------------------------------- > > Key: FLINK-5867 > URL: https://issues.apache.org/jira/browse/FLINK-5867 > Project: Flink > Issue Type: Sub-task > Components: JobManager > Reporter: shuai.xu > Assignee: shuai.xu > > The RestartPipelinedRegionStrategy's responsibility is the following: > 1. Calculate all FailoverRegions and their relations when initializing. > 2. Listen for the failure of the job and executions, and find corresponding > FailoverRegions to do the failover. -- This message was sent by Atlassian JIRA (v6.3.15#6346)