[ https://issues.apache.org/jira/browse/FLINK-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420945#comment-15420945 ]
ASF GitHub Bot commented on FLINK-4273: --------------------------------------- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2313#discussion_r74757666 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java --- @@ -118,27 +138,168 @@ public static JobExecutionResult submitJobAndWait( sysoutLogUpdates); ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps); - + + Future<Object> submissionFuture = Patterns.ask( + jobClientActor, + new JobClientMessages.SubmitJobAndWait(jobGraph), + new Timeout(AkkaUtils.INF_TIMEOUT())); + + return new JobListeningContext( + jobGraph.getJobID(), + submissionFuture, + jobClientActor, + classLoader); + } + + + /** + * Attaches to a running Job using the JobID. + * Reconstructs the user class loader by downloading the jars from the JobManager. + * @throws JobRetrievalException if anything goes wrong while retrieving the job + */ + public static JobListeningContext attachToRunningJob( + JobID jobID, + ActorGateway jobManagerGateWay, + Configuration configuration, + ActorSystem actorSystem, + LeaderRetrievalService leaderRetrievalService, + FiniteDuration timeout, + boolean sysoutLogUpdates) throws JobRetrievalException { + + checkNotNull(jobID, "The jobID must not be null."); + checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not be null."); + checkNotNull(configuration, "The configuration must not be null."); + checkNotNull(actorSystem, "The actorSystem must not be null."); + checkNotNull(leaderRetrievalService, "The jobManagerGateway must not be null."); + checkNotNull(timeout, "The timeout must not be null."); + + // retrieve classloader first before doing anything + ClassLoader classloader; + try { + classloader = retrieveClassLoader(jobID, jobManagerGateWay, configuration, timeout); + LOG.info("Reconstructed class loader for Job {}" , jobID); + } catch (Exception e) { + LOG.warn("Couldn't retrieve classloader for {}. Using system class loader", jobID, e); + classloader = JobClient.class.getClassLoader(); + } + + // we create a proxy JobClientActor that deals with all communication with + // the JobManager. It forwards the job submission, checks the success/failure responses, logs + // update messages, watches for disconnect between client and JobManager, ... + Props jobClientActorProps = JobClientActor.createJobClientActorProps( + leaderRetrievalService, + timeout, + sysoutLogUpdates); + + ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps); + + Future<Object> attachmentFuture = Patterns.ask( + jobClientActor, + new JobClientMessages.AttachToJobAndWait(jobID), + new Timeout(AkkaUtils.INF_TIMEOUT())); + + return new JobListeningContext( + jobID, + attachmentFuture, + jobClientActor, + classloader); + } + + /** + * Reconstructs the class loader by first requesting information about it at the JobManager + * and then downloading missing jar files. + * @param jobID id of job + * @param jobManager gateway to the JobManager + * @param config the flink configuration + * @param timeout timeout for querying the jobmanager + * @return A classloader that should behave like the original classloader + * @throws JobRetrievalException if anything goes wrong + */ + public static ClassLoader retrieveClassLoader( + JobID jobID, + ActorGateway jobManager, + Configuration config, + FiniteDuration timeout) + throws JobRetrievalException { + + BlobCache blobClient = null; + try { + final Object jmAnswer; + try { + jmAnswer = Await.result( + jobManager.ask( + new JobManagerMessages.RequestClassloadingProps(jobID), timeout), timeout); + } catch (Exception e) { + throw new JobRetrievalException(jobID, "JobManager didn't respond", e); + } + + if (jmAnswer instanceof JobManagerMessages.ClassloadingProps) { + JobManagerMessages.ClassloadingProps props = ((JobManagerMessages.ClassloadingProps) jmAnswer); + + Option<String> jmHost = jobManager.actor().path().address().host(); + String jmHostname = jmHost.isDefined() ? jmHost.get() : "localhost"; + InetSocketAddress serverAddress = new InetSocketAddress(jmHostname, props.blobManagerPort()); + blobClient = new BlobCache(serverAddress, config); + + final List<BlobKey> requiredJarFiles = props.requiredJarFiles(); + final List<URL> requiredClasspaths = props.requiredClasspaths(); + + final URL[] allURLs = new URL[requiredJarFiles.size() + requiredClasspaths.size()]; + + int pos = 0; + for (BlobKey blobKey : props.requiredJarFiles()) { + try { + allURLs[pos++] = blobClient.getURL(blobKey); + } catch (Exception e) { + throw new JobRetrievalException(jobID, "Failed to download BlobKey " + blobKey); + } + } + + for (URL url : requiredClasspaths) { + allURLs[pos++] = url; + } + + return new URLClassLoader(allURLs, JobClient.class.getClassLoader()); + } else if (jmAnswer instanceof JobManagerMessages.JobNotFound) { + throw new JobRetrievalException(jobID, "Couldn't retrieve class loader. Job " + jobID + " not found"); + } else { + throw new JobRetrievalException(jobID, "Unknown response from JobManager: " + jmAnswer); + } + } finally { + if (blobClient != null) { + blobClient.shutdown(); --- End diff -- Isn't this call deleting all the blobs we've downloaded again? > Refactor JobClientActor to watch already submitted jobs > -------------------------------------------------------- > > Key: FLINK-4273 > URL: https://issues.apache.org/jira/browse/FLINK-4273 > Project: Flink > Issue Type: Sub-task > Components: Client > Reporter: Maximilian Michels > Assignee: Maximilian Michels > Priority: Minor > Fix For: 1.2.0 > > > The JobClientActor assumes that it receives a job, submits it, and waits for > the result. This process should be broken up into a submission process and a > waiting process which can both be entered independently. This leads to two > different entry points: > 1) submit(job) -> wait > 2) retrieve(jobID) -> wait -- This message was sent by Atlassian JIRA (v6.3.4#6332)