[ https://issues.apache.org/jira/browse/FLINK-2804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14944694#comment-14944694 ]
ASF GitHub Bot commented on FLINK-2804: --------------------------------------- GitHub user uce opened a pull request: https://github.com/apache/flink/pull/1230 [FLINK-2804] [client, runtime] Add blocking job submission support in… … … case of JobManager recovery This is based on some other PRs. Only review the last commit. The client submit and wait method gets a configuration parameter, which it uses to determine how to react to a lost job manager. In normal operation, nothing changes. This is the log output of a test for this. This is tailored to the current state of the job graph recovery. I think there is some room for improvement in the future in the messages the client gets on recovery. Currently the job manager simply assumes that the same job client actor is waiting for updates (that's why you see multiple submission messages. The most important part to review is the retry loop in `submitJobAndWait#submitJobAndWait`. @tillrohrmann, can you review this please? ``` 1 > 10:21:48,754 INFO org.apache.flink.runtime.client.JobClientRecoveryITCase - ================================================================================ 1 > 10:21:48,756 INFO org.apache.flink.runtime.client.JobClientRecoveryITCase - Test testJobManagerConnectionLoss(org.apache.flink.runtime.client.JobClientRecoveryITCase) is running. 1 > 10:21:48,756 INFO org.apache.flink.runtime.client.JobClientRecoveryITCase - -------------------------------------------------------------------------------- 1 > 10:21:48,970 INFO org.apache.flink.runtime.client.JobClientRecoveryITCase - Started JobManagerProcess(id=0, port=52432). 1 > 10:21:49,037 INFO org.apache.flink.runtime.client.JobClientRecoveryITCase - Started JobManagerProcess(id=1, port=52433). 1 > 10:21:49,104 INFO org.apache.flink.runtime.client.JobClientRecoveryITCase - Started JobManagerProcess(id=2, port=52434). 1 > 10:21:49,863 INFO org.apache.flink.runtime.client.JobClientRecoveryITCase - Started taskmanager. 1 > 10:21:49,899 INFO org.apache.flink.runtime.client.JobClientRecoveryITCase - Waiting for leader retrieval. 1 > 10:21:50,549 INFO org.apache.flink.runtime.client.JobClientRecoveryITCase - Leader determined as akka.tcp://flink@127.0.0.1:52432/user/jobmanager. Trying to determine, which job manager process this is. 1 > 10:21:50,555 INFO org.apache.flink.runtime.client.JobClientRecoveryITCase - Leader determined as process 0. 1 > 10:21:50,716 INFO org.apache.flink.runtime.client.JobClientRecoveryITCase - Waiting for task manager to connect. 1 > 10:21:50,724 INFO org.apache.flink.runtime.client.JobClientRecoveryITCase - Task manager connected. 1 > 10:21:50,726 INFO org.apache.flink.runtime.client.JobClientRecoveryITCase - Waiting for first execution of job graph. 1 > 10:21:50,726 INFO org.apache.flink.runtime.client.JobClientRecoveryITCase - Submitting blocking twice job. 1 > 10:21:50,729 INFO org.apache.flink.runtime.client.JobClient - Submitting job f56fb890afbddc1f25ab5d0e56771cad in recovery mode ZOOKEEPER 1 > 10:21:50,732 INFO org.apache.flink.runtime.client.JobClient - Sending message to JobManager akka.tcp://flink@127.0.0.1:52432/user/jobmanager to submit job blocking job graph (f56fb890afbddc1f25ab5d0e56771cad) and wait for progress 1 > 10:21:50,846 INFO org.apache.flink.runtime.client.JobClient - Job was successfully submitted to the JobManager 1 > 10:21:50,849 INFO org.apache.flink.runtime.client.JobClient - 10/06/2015 10:21:50 Job execution switched to status RUNNING. 1 > 10:21:50,854 INFO org.apache.flink.runtime.client.JobClient - 10/06/2015 10:21:50 blocking vertex(1/1) switched to SCHEDULED 1 > 10:21:50,855 INFO org.apache.flink.runtime.client.JobClient - 10/06/2015 10:21:50 blocking vertex(1/1) switched to DEPLOYING 1 > 10:21:50,876 INFO org.apache.flink.runtime.client.JobClient - 10/06/2015 10:21:50 blocking vertex(1/1) switched to RUNNING 1 > 10:21:50,934 INFO org.apache.flink.runtime.client.JobClientRecoveryITCase - Killing the leader. 1 > 10:21:50,935 INFO org.apache.flink.runtime.client.JobClientRecoveryITCase - Waiting for leader retrieval 1 > 10:21:57,022 INFO org.apache.flink.runtime.client.JobClientRecoveryITCase - Leader determined as akka.tcp://flink@127.0.0.1:52433/user/jobmanager. Trying to determine, which job manager process this is. 1 > 10:21:57,022 INFO org.apache.flink.runtime.client.JobClientRecoveryITCase - Leader determined as process 1 1 > 10:21:57,022 INFO org.apache.flink.runtime.client.JobClientRecoveryITCase - Waiting for second execution of job graph. 1 > 10:21:59,907 INFO org.apache.flink.runtime.client.JobClient - Lost connection to JobManager akka.tcp://flink@127.0.0.1:52432/user/jobmanager 1 > 10:21:59,908 INFO org.apache.flink.runtime.client.JobClient - Lost connection to job manager. Retrieving new one. 1 > 10:21:59,930 INFO org.apache.flink.runtime.client.JobClient - New job manager address akka.tcp://flink@127.0.0.1:52433/user/jobmanager and id 43147f59-30bc-4a9c-a5db-43136d14b409 1 > 10:21:59,930 INFO org.apache.flink.runtime.client.JobClient - Updating job client actor with new job manager. 1 > 10:22:07,137 INFO org.apache.flink.runtime.client.JobClient - Job was successfully submitted to the JobManager 1 > 10:22:07,141 INFO org.apache.flink.runtime.client.JobClient - 10/06/2015 10:22:07 Job execution switched to status RUNNING. 1 > 10:22:07,144 INFO org.apache.flink.runtime.client.JobClient - 10/06/2015 10:22:07 blocking vertex(1/1) switched to SCHEDULED 1 > 10:22:07,145 INFO org.apache.flink.runtime.client.JobClient - 10/06/2015 10:22:07 blocking vertex(1/1) switched to DEPLOYING 1 > 10:22:07,157 INFO org.apache.flink.runtime.client.JobClient - 10/06/2015 10:22:07 blocking vertex(1/1) switched to RUNNING 1 > 10:22:07,241 INFO org.apache.flink.runtime.client.JobClientRecoveryITCase - Killing the leader. 1 > 10:22:07,241 INFO org.apache.flink.runtime.client.JobClientRecoveryITCase - Waiting for job submission to return. 1 > 10:22:19,906 INFO org.apache.flink.runtime.client.JobClient - Lost connection to JobManager akka.tcp://flink@127.0.0.1:52433/user/jobmanager 1 > 10:22:19,907 INFO org.apache.flink.runtime.client.JobClient - Lost connection to job manager. Retrieving new one. 1 > 10:22:19,926 INFO org.apache.flink.runtime.client.JobClient - New job manager address akka.tcp://flink@127.0.0.1:52434/user/jobmanager and id d195f698-9cbd-4695-8809-8c93bfe96fac 1 > 10:22:19,926 INFO org.apache.flink.runtime.client.JobClient - Updating job client actor with new job manager. 1 > 10:22:25,102 INFO org.apache.flink.runtime.client.JobClient - Job was successfully submitted to the JobManager 1 > 10:22:25,105 INFO org.apache.flink.runtime.client.JobClient - 10/06/2015 10:22:25 Job execution switched to status RUNNING. 1 > 10:22:25,108 INFO org.apache.flink.runtime.client.JobClient - 10/06/2015 10:22:25 blocking vertex(1/1) switched to SCHEDULED 1 > 10:22:25,108 INFO org.apache.flink.runtime.client.JobClient - 10/06/2015 10:22:25 blocking vertex(1/1) switched to DEPLOYING 1 > 10:22:25,119 INFO org.apache.flink.runtime.client.JobClient - 10/06/2015 10:22:25 blocking vertex(1/1) switched to RUNNING 1 > 10:22:25,121 INFO org.apache.flink.runtime.client.JobClient - 10/06/2015 10:22:25 blocking vertex(1/1) switched to FINISHED 1 > 10:22:25,122 INFO org.apache.flink.runtime.client.JobClient - 10/06/2015 10:22:25 Job execution switched to status FINISHED. 1 > 10:22:25,135 INFO org.apache.flink.runtime.client.JobClient - Job execution complete 1 > 10:22:25,135 INFO org.apache.flink.runtime.client.JobClientRecoveryITCase - Job submission returned. 1 > 10:22:25,137 INFO org.apache.flink.runtime.client.JobClientRecoveryITCase - -------------------------------------------------------------------------------- 1 > 10:22:25,137 INFO org.apache.flink.runtime.client.JobClientRecoveryITCase - Test testJobManagerConnectionLoss(org.apache.flink.runtime.client.JobClientRecoveryITCase) successfully run. 1 > 10:22:25,137 INFO org.apache.flink.runtime.client.JobClientRecoveryITCase - ================================================================================ ``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/uce/flink client-recovery Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1230.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1230 ---- commit 690822a3a8d933f3e316e577cbd632dc4c30dd6b Author: Ufuk Celebi <u...@apache.org> Date: 2015-09-03T13:13:28Z [runtime] Add type parameter to ByteStreamStateHandle commit 38e3f303362c6032162b47feaed145dcab2e247b Author: Ufuk Celebi <u...@apache.org> Date: 2015-09-19T17:53:18Z [clients] Submit job detached if recovery enabled commit 9a5e119a8ab82c2dc63bee64691e38911413b49c Author: Ufuk Celebi <u...@apache.org> Date: 2015-09-20T11:08:24Z [FLINK-2652] [tests] Temporary ignore flakey PartitionRequestClientFactoryTest commit 2ef10b77cbb12b79414809cff2f0c1e162823ed8 Author: Ufuk Celebi <u...@apache.org> Date: 2015-09-01T15:25:46Z [FLINK-2354] [runtime] Add job graph and checkpoint recovery Sync shutdown of CheckpointCoordinator in ExecutionGraph - The CheckpointCoordinator was shutdown asynchronously - This could result in a NPE when the ExecutionGraph is archived (sets the exec context to null) - Instead of synchronizing this, just do the blocking ZooKeeper operations async (one of which was already async) Fix resource leak during concurrent removal commit a50899b5fea4289c51ff09395b6300e5ff410f05 Author: Ufuk Celebi <u...@apache.org> Date: 2015-09-30T14:38:37Z [FLINK-2792] [jobmanager, logging] Set actor message log level to TRACE commit e85c179998971b10e8af1b235919ae04819e6c05 Author: Ufuk Celebi <u...@apache.org> Date: 2015-09-30T14:38:57Z [tests] Add ChaosMonkeyTest commit c96c11bb673229881ff4167fd1260c7baca4db42 Author: Ufuk Celebi <u...@apache.org> Date: 2015-10-05T08:05:05Z [FLINK-2805] [blobmanager] Write JARs to file state backend for recovery commit 52e5ec33a317a8c24c45a837797409c0749bb5c7 Author: Ufuk Celebi <u...@apache.org> Date: 2015-10-05T22:38:38Z [FLINK-2804] [client, runtime] Add blocking job submission support in case of JobManager recovery ---- > Support blocking job submission with Job Manager recovery > --------------------------------------------------------- > > Key: FLINK-2804 > URL: https://issues.apache.org/jira/browse/FLINK-2804 > Project: Flink > Issue Type: Improvement > Affects Versions: master > Reporter: Ufuk Celebi > Assignee: Ufuk Celebi > Priority: Minor > > Submitting a job in a blocking fashion with JobManager recovery and a failing > JobManager fails on the client side (the one submitting the job). The job > still continues to be recovered. > I propose to add simple support to re-retrieve the leading job manager and > update the client actor with it and then wait for the result as before. > As of the current standing in PR #1153 > (https://github.com/apache/flink/pull/1153) the job manager assumes that the > same actor is running and just keeps on sending execution state updates etc. > (if the listening behaviour is not detached). -- This message was sent by Atlassian JIRA (v6.3.4#6332)