[ https://issues.apache.org/jira/browse/FLINK-2354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14943454#comment-14943454 ]
ASF GitHub Bot commented on FLINK-2354: --------------------------------------- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41152092 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphs.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobgraph.JobGraph; + +import java.util.Collections; +import java.util.List; + +/** + * {@link SubmittedJobGraph} instances for JobManagers running in {@link RecoveryMode#STANDALONE}. + * + * <p>All operations are NoOps, because {@link JobGraph} instances cannot be recovered in this + * recovery mode. + */ +public class StandaloneSubmittedJobGraphs implements SubmittedJobGraphs { + + @Override + public void start(SubmittedJobGraphListener jobGraphListener) throws Exception { + // Nothing to do + } + + @Override + public void stop() { + // Nothing to do + } + + @Override + public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception { + // Nothing to do + } + + @Override + public void removeJobGraph(JobID jobId) throws Exception { + // Nothing to do + } + + @Override + public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception { + throw new IllegalStateException("StandaloneSubmittedJobGraphs cannot recover job graphs. " + + "How did you end up here?"); + } + + @Override + public List<SubmittedJobGraph> recoverJobGraphs() throws Exception { + return Collections.emptyList(); --- End diff -- For sure > Recover running jobs on JobManager failure > ------------------------------------------ > > Key: FLINK-2354 > URL: https://issues.apache.org/jira/browse/FLINK-2354 > Project: Flink > Issue Type: Sub-task > Components: JobManager > Affects Versions: master > Reporter: Ufuk Celebi > Assignee: Ufuk Celebi > Fix For: 0.10 > > > tl;dr Persist JobGraphs in state backend and coordinate reference to state > handle via ZooKeeper. > Problem: When running multiple JobManagers in high availability mode, the > leading job manager looses all running jobs when it fails. After a new > leading job manager is elected, it is not possible to recover any previously > running jobs. > Solution: The leading job manager, which receives the job graph writes 1) the > job graph to a state backend, and 2) a reference to the respective state > handle to ZooKeeper. In general, job graphs can become large (multiple MBs, > because they include closures etc.). ZooKeeper is not designed for data of > this size. The level of indirection via the reference to the state backend > keeps the data in ZooKeeper small. > Proposed ZooKeeper layout: > /flink (default) > +- currentJobs > +- job id i > +- state handle reference of job graph i > The 'currentJobs' node needs to be persistent to allow recovery of jobs > between job managers. The currentJobs node needs to satisfy the following > invariant: There is a reference to a job graph with id i IFF the respective > job graph needs to be recovered by a newly elected job manager leader. > With this in place, jobs will be recovered from their initial state (as if > resubmitted). The next step is to backup the runtime state handles of > checkpoints in a similar manner. > --- > This work will be based on [~trohrm...@apache.org]'s implementation of > FLINK-2291. The leader election service notifies the job manager about > granted/revoked leadership. This notification happens via Akka and thus > serially *per* job manager, but results in eventually consistent state > between job managers. For some snapshots of time it is possible to have a new > leader granted leadership, before the old one has been revoked its leadership. > [~trohrm...@apache.org], can you confirm that leadership does not guarantee > mutually exclusive access to the shared 'currentJobs' state? > For example, the following can happen: > - JM 1 is leader, JM 2 is standby > - JOB i is running (and hence /flink/currentJobs/i exists) > - ZK notifies leader election service (LES) of JM 1 and JM 2 > - LES 2 immediately notifies JM 2 about granted leadership, but LES 1 > notification revoking leadership takes longer > - JOB i finishes (TMs don't notice leadership change yet) and JM 1 receives > final JobStatusChange > - JM 2 resubmits the job /flink/currentJobs/i > - JM 1 removes /flink/currentJobs/i, because it is now finished > => inconsistent state (wrt the specified invariant above) > If it is indeed a problem, we can circumvent this with a Curator recipe for > [shared locks|http://curator.apache.org/curator-recipes/shared-lock.html] to > coordinate the access to currentJobs. The lock needs to be acquired on > leadership. > --- > Minimum required tests: > - Unit tests for job graph serialization and writing to state backend and > ZooKeeper with expected nodes > - Unit tests for job submission to job manager in leader/non-leader state > - Unit tests for leadership granting/revoking and job submission/restarting > interleavings > - Process failure integration tests with single and multiple running jobs -- This message was sent by Atlassian JIRA (v6.3.4#6332)