[ https://issues.apache.org/jira/browse/FLINK-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15426486#comment-15426486 ]
ASF GitHub Bot commented on FLINK-4273: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2313#discussion_r75310003 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java --- @@ -118,27 +138,162 @@ public static JobExecutionResult submitJobAndWait( sysoutLogUpdates); ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps); - + + Future<Object> submissionFuture = Patterns.ask( + jobClientActor, + new JobClientMessages.SubmitJobAndWait(jobGraph), + new Timeout(AkkaUtils.INF_TIMEOUT())); + + return new JobListeningContext( + jobGraph.getJobID(), + submissionFuture, + jobClientActor, + classLoader); + } + + + /** + * Attaches to a running Job using the JobID. + * Reconstructs the user class loader by downloading the jars from the JobManager. + * @throws JobRetrievalException if anything goes wrong while retrieving the job + */ + public static JobListeningContext attachToRunningJob( + JobID jobID, + ActorGateway jobManagerGateWay, + Configuration configuration, + ActorSystem actorSystem, + LeaderRetrievalService leaderRetrievalService, + FiniteDuration timeout, + boolean sysoutLogUpdates) throws JobRetrievalException { + + checkNotNull(jobID, "The jobID must not be null."); + checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not be null."); + checkNotNull(configuration, "The configuration must not be null."); + checkNotNull(actorSystem, "The actorSystem must not be null."); + checkNotNull(leaderRetrievalService, "The jobManagerGateway must not be null."); + checkNotNull(timeout, "The timeout must not be null."); + + // retrieve classloader first before doing anything + ClassLoader classloader; + try { + classloader = retrieveClassLoader(jobID, jobManagerGateWay, configuration, timeout); + LOG.info("Reconstructed class loader for Job {}" , jobID); + } catch (Exception e) { + LOG.warn("Couldn't retrieve classloader for {}. Using system class loader", jobID, e); + classloader = JobClient.class.getClassLoader(); + } + + // we create a proxy JobClientActor that deals with all communication with + // the JobManager. It forwards the job submission, checks the success/failure responses, logs + // update messages, watches for disconnect between client and JobManager, ... + Props jobClientActorProps = JobClientActor.createJobClientActorProps( + leaderRetrievalService, + timeout, + sysoutLogUpdates); + + ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps); + + Future<Object> attachmentFuture = Patterns.ask( + jobClientActor, + new JobClientMessages.AttachToJobAndWait(jobID), + new Timeout(AkkaUtils.INF_TIMEOUT())); + + return new JobListeningContext( + jobID, + attachmentFuture, + jobClientActor, + classloader); + } + + /** + * Reconstructs the class loader by first requesting information about it at the JobManager + * and then downloading missing jar files. + * @param jobID id of job + * @param jobManager gateway to the JobManager + * @param config the flink configuration + * @param timeout timeout for querying the jobmanager + * @return A classloader that should behave like the original classloader + * @throws JobRetrievalException if anything goes wrong + */ + public static ClassLoader retrieveClassLoader( + JobID jobID, + ActorGateway jobManager, + Configuration config, + FiniteDuration timeout) + throws JobRetrievalException { + + final Object jmAnswer; + try { + jmAnswer = Await.result( + jobManager.ask( + new JobManagerMessages.RequestClassloadingProps(jobID), timeout), timeout); + } catch (Exception e) { + throw new JobRetrievalException(jobID, "JobManager didn't respond", e); + } + + if (jmAnswer instanceof JobManagerMessages.ClassloadingProps) { + JobManagerMessages.ClassloadingProps props = ((JobManagerMessages.ClassloadingProps) jmAnswer); + + Option<String> jmHost = jobManager.actor().path().address().host(); + String jmHostname = jmHost.isDefined() ? jmHost.get() : "localhost"; + InetSocketAddress serverAddress = new InetSocketAddress(jmHostname, props.blobManagerPort()); + final BlobCache blobClient = new BlobCache(serverAddress, config); --- End diff -- Does it make sense to clean up this `BlobCache` once the job execution result has been delivered? > Refactor JobClientActor to watch already submitted jobs > -------------------------------------------------------- > > Key: FLINK-4273 > URL: https://issues.apache.org/jira/browse/FLINK-4273 > Project: Flink > Issue Type: Sub-task > Components: Client > Reporter: Maximilian Michels > Assignee: Maximilian Michels > Priority: Minor > Fix For: 1.2.0 > > > The JobClientActor assumes that it receives a job, submits it, and waits for > the result. This process should be broken up into a submission process and a > waiting process which can both be entered independently. This leads to two > different entry points: > 1) submit(job) -> wait > 2) retrieve(jobID) -> wait -- This message was sent by Atlassian JIRA (v6.3.4#6332)