[ 
https://issues.apache.org/jira/browse/FLINK-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15426486#comment-15426486
 ] 

ASF GitHub Bot commented on FLINK-4273:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2313#discussion_r75310003
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
    @@ -118,27 +138,162 @@ public static JobExecutionResult submitJobAndWait(
                        sysoutLogUpdates);
     
                ActorRef jobClientActor = 
actorSystem.actorOf(jobClientActorProps);
    -           
    +
    +           Future<Object> submissionFuture = Patterns.ask(
    +                           jobClientActor,
    +                           new 
JobClientMessages.SubmitJobAndWait(jobGraph),
    +                           new Timeout(AkkaUtils.INF_TIMEOUT()));
    +
    +           return new JobListeningContext(
    +                           jobGraph.getJobID(),
    +                           submissionFuture,
    +                           jobClientActor,
    +                           classLoader);
    +   }
    +
    +
    +   /**
    +    * Attaches to a running Job using the JobID.
    +    * Reconstructs the user class loader by downloading the jars from the 
JobManager.
    +    * @throws JobRetrievalException if anything goes wrong while 
retrieving the job
    +    */
    +   public static JobListeningContext attachToRunningJob(
    +                   JobID jobID,
    +                   ActorGateway jobManagerGateWay,
    +                   Configuration configuration,
    +                   ActorSystem actorSystem,
    +                   LeaderRetrievalService leaderRetrievalService,
    +                   FiniteDuration timeout,
    +                   boolean sysoutLogUpdates) throws JobRetrievalException {
    +
    +           checkNotNull(jobID, "The jobID must not be null.");
    +           checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not 
be null.");
    +           checkNotNull(configuration, "The configuration must not be 
null.");
    +           checkNotNull(actorSystem, "The actorSystem must not be null.");
    +           checkNotNull(leaderRetrievalService, "The jobManagerGateway 
must not be null.");
    +           checkNotNull(timeout, "The timeout must not be null.");
    +
    +           // retrieve classloader first before doing anything
    +           ClassLoader classloader;
    +           try {
    +                   classloader = retrieveClassLoader(jobID, 
jobManagerGateWay, configuration, timeout);
    +                   LOG.info("Reconstructed class loader for Job {}" , 
jobID);
    +           } catch (Exception e) {
    +                   LOG.warn("Couldn't retrieve classloader for {}. Using 
system class loader", jobID, e);
    +                   classloader = JobClient.class.getClassLoader();
    +           }
    +
    +           // we create a proxy JobClientActor that deals with all 
communication with
    +           // the JobManager. It forwards the job submission, checks the 
success/failure responses, logs
    +           // update messages, watches for disconnect between client and 
JobManager, ...
    +           Props jobClientActorProps = 
JobClientActor.createJobClientActorProps(
    +                   leaderRetrievalService,
    +                   timeout,
    +                   sysoutLogUpdates);
    +
    +           ActorRef jobClientActor = 
actorSystem.actorOf(jobClientActorProps);
    +
    +           Future<Object> attachmentFuture = Patterns.ask(
    +                           jobClientActor,
    +                           new JobClientMessages.AttachToJobAndWait(jobID),
    +                           new Timeout(AkkaUtils.INF_TIMEOUT()));
    +
    +           return new JobListeningContext(
    +                           jobID,
    +                           attachmentFuture,
    +                           jobClientActor,
    +                           classloader);
    +   }
    +
    +   /**
    +    * Reconstructs the class loader by first requesting information about 
it at the JobManager
    +    * and then downloading missing jar files.
    +    * @param jobID id of job
    +    * @param jobManager gateway to the JobManager
    +    * @param config the flink configuration
    +    * @param timeout timeout for querying the jobmanager
    +    * @return A classloader that should behave like the original 
classloader
    +    * @throws JobRetrievalException if anything goes wrong
    +    */
    +   public static ClassLoader retrieveClassLoader(
    +           JobID jobID,
    +           ActorGateway jobManager,
    +           Configuration config,
    +           FiniteDuration timeout)
    +           throws JobRetrievalException {
    +
    +           final Object jmAnswer;
    +           try {
    +                   jmAnswer = Await.result(
    +                           jobManager.ask(
    +                                   new 
JobManagerMessages.RequestClassloadingProps(jobID), timeout), timeout);
    +           } catch (Exception e) {
    +                   throw new JobRetrievalException(jobID, "JobManager 
didn't respond", e);
    +           }
    +
    +           if (jmAnswer instanceof JobManagerMessages.ClassloadingProps) {
    +                   JobManagerMessages.ClassloadingProps props = 
((JobManagerMessages.ClassloadingProps) jmAnswer);
    +
    +                   Option<String> jmHost = 
jobManager.actor().path().address().host();
    +                   String jmHostname = jmHost.isDefined() ? jmHost.get() : 
"localhost";
    +                   InetSocketAddress serverAddress = new 
InetSocketAddress(jmHostname, props.blobManagerPort());
    +                   final BlobCache blobClient = new 
BlobCache(serverAddress, config);
    --- End diff --
    
    Does it make sense to clean up this `BlobCache` once the job execution 
result has been delivered?


> Refactor JobClientActor to watch already submitted jobs 
> --------------------------------------------------------
>
>                 Key: FLINK-4273
>                 URL: https://issues.apache.org/jira/browse/FLINK-4273
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Client
>            Reporter: Maximilian Michels
>            Assignee: Maximilian Michels
>            Priority: Minor
>             Fix For: 1.2.0
>
>
> The JobClientActor assumes that it receives a job, submits it, and waits for 
> the result. This process should be broken up into a submission process and a 
> waiting process which can both be entered independently. This leads to two 
> different entry points:
> 1) submit(job) -> wait
> 2) retrieve(jobID) -> wait



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to