[ 
https://issues.apache.org/jira/browse/FLINK-2804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14944694#comment-14944694
 ] 

ASF GitHub Bot commented on FLINK-2804:
---------------------------------------

GitHub user uce opened a pull request:

    https://github.com/apache/flink/pull/1230

    [FLINK-2804] [client, runtime] Add blocking job submission support in… … … 
case of JobManager recovery

    This is based on some other PRs. Only review the last commit.
    
    The client submit and wait method gets a configuration parameter, which it 
uses to determine how to react to a lost job manager. In normal operation, 
nothing changes.
    
    This is the log output of a test for this. This is tailored to the current 
state of the job graph recovery. I think there is some room for improvement in 
the future in the messages the client gets on recovery. Currently the job 
manager simply assumes that the same job client actor is waiting for updates 
(that's why you see multiple submission messages.
    
    The most important part to review is the retry loop in 
`submitJobAndWait#submitJobAndWait`. @tillrohrmann, can you review this please?
    
    ```
    1 > 10:21:48,754 INFO  
org.apache.flink.runtime.client.JobClientRecoveryITCase       - 
================================================================================
    1 > 10:21:48,756 INFO  
org.apache.flink.runtime.client.JobClientRecoveryITCase       - Test 
testJobManagerConnectionLoss(org.apache.flink.runtime.client.JobClientRecoveryITCase)
 is running.
    1 > 10:21:48,756 INFO  
org.apache.flink.runtime.client.JobClientRecoveryITCase       - 
--------------------------------------------------------------------------------
    1 > 10:21:48,970 INFO  
org.apache.flink.runtime.client.JobClientRecoveryITCase       - Started 
JobManagerProcess(id=0, port=52432).
    1 > 10:21:49,037 INFO  
org.apache.flink.runtime.client.JobClientRecoveryITCase       - Started 
JobManagerProcess(id=1, port=52433).
    1 > 10:21:49,104 INFO  
org.apache.flink.runtime.client.JobClientRecoveryITCase       - Started 
JobManagerProcess(id=2, port=52434).
    1 > 10:21:49,863 INFO  
org.apache.flink.runtime.client.JobClientRecoveryITCase       - Started 
taskmanager.
    1 > 10:21:49,899 INFO  
org.apache.flink.runtime.client.JobClientRecoveryITCase       - Waiting for 
leader retrieval.
    1 > 10:21:50,549 INFO  
org.apache.flink.runtime.client.JobClientRecoveryITCase       - Leader 
determined as akka.tcp://flink@127.0.0.1:52432/user/jobmanager. Trying to 
determine, which job manager process this is.
    1 > 10:21:50,555 INFO  
org.apache.flink.runtime.client.JobClientRecoveryITCase       - Leader 
determined as process 0.
    1 > 10:21:50,716 INFO  
org.apache.flink.runtime.client.JobClientRecoveryITCase       - Waiting for 
task manager to connect.
    1 > 10:21:50,724 INFO  
org.apache.flink.runtime.client.JobClientRecoveryITCase       - Task manager 
connected.
    1 > 10:21:50,726 INFO  
org.apache.flink.runtime.client.JobClientRecoveryITCase       - Waiting for 
first execution of job graph.
    1 > 10:21:50,726 INFO  
org.apache.flink.runtime.client.JobClientRecoveryITCase       - Submitting 
blocking twice job.
    1 > 10:21:50,729 INFO  org.apache.flink.runtime.client.JobClient            
         - Submitting job f56fb890afbddc1f25ab5d0e56771cad in recovery mode 
ZOOKEEPER
    1 > 10:21:50,732 INFO  org.apache.flink.runtime.client.JobClient            
         - Sending message to JobManager 
akka.tcp://flink@127.0.0.1:52432/user/jobmanager to submit job blocking job 
graph (f56fb890afbddc1f25ab5d0e56771cad) and wait for progress
    1 > 10:21:50,846 INFO  org.apache.flink.runtime.client.JobClient            
         - Job was successfully submitted to the JobManager
    1 > 10:21:50,849 INFO  org.apache.flink.runtime.client.JobClient            
         - 10/06/2015 10:21:50  Job execution switched to status RUNNING.
    1 > 10:21:50,854 INFO  org.apache.flink.runtime.client.JobClient            
         - 10/06/2015 10:21:50  blocking vertex(1/1) switched to SCHEDULED 
    1 > 10:21:50,855 INFO  org.apache.flink.runtime.client.JobClient            
         - 10/06/2015 10:21:50  blocking vertex(1/1) switched to DEPLOYING 
    1 > 10:21:50,876 INFO  org.apache.flink.runtime.client.JobClient            
         - 10/06/2015 10:21:50  blocking vertex(1/1) switched to RUNNING 
    1 > 10:21:50,934 INFO  
org.apache.flink.runtime.client.JobClientRecoveryITCase       - Killing the 
leader.
    1 > 10:21:50,935 INFO  
org.apache.flink.runtime.client.JobClientRecoveryITCase       - Waiting for 
leader retrieval
    1 > 10:21:57,022 INFO  
org.apache.flink.runtime.client.JobClientRecoveryITCase       - Leader 
determined as akka.tcp://flink@127.0.0.1:52433/user/jobmanager. Trying to 
determine, which job manager process this is.
    1 > 10:21:57,022 INFO  
org.apache.flink.runtime.client.JobClientRecoveryITCase       - Leader 
determined as process 1
    1 > 10:21:57,022 INFO  
org.apache.flink.runtime.client.JobClientRecoveryITCase       - Waiting for 
second execution of job graph.
    1 > 10:21:59,907 INFO  org.apache.flink.runtime.client.JobClient            
         - Lost connection to JobManager 
akka.tcp://flink@127.0.0.1:52432/user/jobmanager
    1 > 10:21:59,908 INFO  org.apache.flink.runtime.client.JobClient            
         - Lost connection to job manager. Retrieving new one.
    1 > 10:21:59,930 INFO  org.apache.flink.runtime.client.JobClient            
         - New job manager address 
akka.tcp://flink@127.0.0.1:52433/user/jobmanager and id 
43147f59-30bc-4a9c-a5db-43136d14b409
    1 > 10:21:59,930 INFO  org.apache.flink.runtime.client.JobClient            
         - Updating job client actor with new job manager.
    1 > 10:22:07,137 INFO  org.apache.flink.runtime.client.JobClient            
         - Job was successfully submitted to the JobManager
    1 > 10:22:07,141 INFO  org.apache.flink.runtime.client.JobClient            
         - 10/06/2015 10:22:07  Job execution switched to status RUNNING.
    1 > 10:22:07,144 INFO  org.apache.flink.runtime.client.JobClient            
         - 10/06/2015 10:22:07  blocking vertex(1/1) switched to SCHEDULED 
    1 > 10:22:07,145 INFO  org.apache.flink.runtime.client.JobClient            
         - 10/06/2015 10:22:07  blocking vertex(1/1) switched to DEPLOYING 
    1 > 10:22:07,157 INFO  org.apache.flink.runtime.client.JobClient            
         - 10/06/2015 10:22:07  blocking vertex(1/1) switched to RUNNING 
    1 > 10:22:07,241 INFO  
org.apache.flink.runtime.client.JobClientRecoveryITCase       - Killing the 
leader.
    1 > 10:22:07,241 INFO  
org.apache.flink.runtime.client.JobClientRecoveryITCase       - Waiting for job 
submission to return.
    1 > 10:22:19,906 INFO  org.apache.flink.runtime.client.JobClient            
         - Lost connection to JobManager 
akka.tcp://flink@127.0.0.1:52433/user/jobmanager
    1 > 10:22:19,907 INFO  org.apache.flink.runtime.client.JobClient            
         - Lost connection to job manager. Retrieving new one.
    1 > 10:22:19,926 INFO  org.apache.flink.runtime.client.JobClient            
         - New job manager address 
akka.tcp://flink@127.0.0.1:52434/user/jobmanager and id 
d195f698-9cbd-4695-8809-8c93bfe96fac
    1 > 10:22:19,926 INFO  org.apache.flink.runtime.client.JobClient            
         - Updating job client actor with new job manager.
    1 > 10:22:25,102 INFO  org.apache.flink.runtime.client.JobClient            
         - Job was successfully submitted to the JobManager
    1 > 10:22:25,105 INFO  org.apache.flink.runtime.client.JobClient            
         - 10/06/2015 10:22:25  Job execution switched to status RUNNING.
    1 > 10:22:25,108 INFO  org.apache.flink.runtime.client.JobClient            
         - 10/06/2015 10:22:25  blocking vertex(1/1) switched to SCHEDULED 
    1 > 10:22:25,108 INFO  org.apache.flink.runtime.client.JobClient            
         - 10/06/2015 10:22:25  blocking vertex(1/1) switched to DEPLOYING 
    1 > 10:22:25,119 INFO  org.apache.flink.runtime.client.JobClient            
         - 10/06/2015 10:22:25  blocking vertex(1/1) switched to RUNNING 
    1 > 10:22:25,121 INFO  org.apache.flink.runtime.client.JobClient            
         - 10/06/2015 10:22:25  blocking vertex(1/1) switched to FINISHED 
    1 > 10:22:25,122 INFO  org.apache.flink.runtime.client.JobClient            
         - 10/06/2015 10:22:25  Job execution switched to status FINISHED.
    1 > 10:22:25,135 INFO  org.apache.flink.runtime.client.JobClient            
         - Job execution complete
    1 > 10:22:25,135 INFO  
org.apache.flink.runtime.client.JobClientRecoveryITCase       - Job submission 
returned.
    1 > 10:22:25,137 INFO  
org.apache.flink.runtime.client.JobClientRecoveryITCase       - 
--------------------------------------------------------------------------------
    1 > 10:22:25,137 INFO  
org.apache.flink.runtime.client.JobClientRecoveryITCase       - Test 
testJobManagerConnectionLoss(org.apache.flink.runtime.client.JobClientRecoveryITCase)
 successfully run.
    1 > 10:22:25,137 INFO  
org.apache.flink.runtime.client.JobClientRecoveryITCase       - 
================================================================================
    ```

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/uce/flink client-recovery

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1230.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1230
    
----
commit 690822a3a8d933f3e316e577cbd632dc4c30dd6b
Author: Ufuk Celebi <u...@apache.org>
Date:   2015-09-03T13:13:28Z

    [runtime] Add type parameter to ByteStreamStateHandle

commit 38e3f303362c6032162b47feaed145dcab2e247b
Author: Ufuk Celebi <u...@apache.org>
Date:   2015-09-19T17:53:18Z

    [clients] Submit job detached if recovery enabled

commit 9a5e119a8ab82c2dc63bee64691e38911413b49c
Author: Ufuk Celebi <u...@apache.org>
Date:   2015-09-20T11:08:24Z

    [FLINK-2652] [tests] Temporary ignore flakey 
PartitionRequestClientFactoryTest

commit 2ef10b77cbb12b79414809cff2f0c1e162823ed8
Author: Ufuk Celebi <u...@apache.org>
Date:   2015-09-01T15:25:46Z

    [FLINK-2354] [runtime] Add job graph and checkpoint recovery
    
    Sync shutdown of CheckpointCoordinator in ExecutionGraph
    
    - The CheckpointCoordinator was shutdown asynchronously
    - This could result in a NPE when the ExecutionGraph is archived (sets the 
exec
      context to null)
    - Instead of synchronizing this, just do the blocking ZooKeeper operations
      async (one of which was already async)
    
    Fix resource leak during concurrent removal

commit a50899b5fea4289c51ff09395b6300e5ff410f05
Author: Ufuk Celebi <u...@apache.org>
Date:   2015-09-30T14:38:37Z

    [FLINK-2792] [jobmanager, logging] Set actor message log level to TRACE

commit e85c179998971b10e8af1b235919ae04819e6c05
Author: Ufuk Celebi <u...@apache.org>
Date:   2015-09-30T14:38:57Z

    [tests] Add ChaosMonkeyTest

commit c96c11bb673229881ff4167fd1260c7baca4db42
Author: Ufuk Celebi <u...@apache.org>
Date:   2015-10-05T08:05:05Z

    [FLINK-2805] [blobmanager] Write JARs to file state backend for recovery

commit 52e5ec33a317a8c24c45a837797409c0749bb5c7
Author: Ufuk Celebi <u...@apache.org>
Date:   2015-10-05T22:38:38Z

    [FLINK-2804] [client, runtime] Add blocking job submission support in case 
of JobManager recovery

----


> Support blocking job submission with Job Manager recovery
> ---------------------------------------------------------
>
>                 Key: FLINK-2804
>                 URL: https://issues.apache.org/jira/browse/FLINK-2804
>             Project: Flink
>          Issue Type: Improvement
>    Affects Versions: master
>            Reporter: Ufuk Celebi
>            Assignee: Ufuk Celebi
>            Priority: Minor
>
> Submitting a job in a blocking fashion with JobManager recovery and a failing 
> JobManager fails on the client side (the one submitting the job). The job 
> still continues to be recovered.
> I propose to add simple support to re-retrieve the leading job manager and 
> update the client actor with it and then wait for the result as before.
> As of the current standing in PR #1153 
> (https://github.com/apache/flink/pull/1153) the job manager assumes that the 
> same actor is running and just keeps on sending execution state updates etc. 
> (if the listening behaviour is not detached).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to