[ https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520539#comment-16520539 ]
ASF GitHub Bot commented on FLINK-9280: --------------------------------------- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r197493894 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java --- @@ -54,18 +69,89 @@ public JobSubmitHandler( @Override protected CompletableFuture<JobSubmitResponseBody> handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException { - JobGraph jobGraph; - try { - ObjectInputStream objectIn = new ObjectInputStream(new ByteArrayInputStream(request.getRequestBody().serializedJobGraph)); - jobGraph = (JobGraph) objectIn.readObject(); - } catch (Exception e) { - throw new RestHandlerException( - "Failed to deserialize JobGraph.", - HttpResponseStatus.BAD_REQUEST, - e); + Collection<Path> uploadedFiles = request.getUploadedFiles(); + Map<String, Path> nameToFile = uploadedFiles.stream().collect(Collectors.toMap( + path -> path.getFileName().toString(), + entry -> entry + )); + + JobSubmitRequestBody requestBody = request.getRequestBody(); + + Path jobGraphFile = getPathAndAssertUpload(requestBody.jobGraphFileName, "JobGraph", nameToFile); + + Collection<org.apache.flink.core.fs.Path> jarFiles = new ArrayList<>(requestBody.jarFileNames.size()); + for (String jarFileName : requestBody.jarFileNames) { + Path jarFile = getPathAndAssertUpload(jarFileName, "Jar", nameToFile); + jarFiles.add(new org.apache.flink.core.fs.Path(jarFile.toString())); + } + + Collection<Tuple2<String, org.apache.flink.core.fs.Path>> artifacts = new ArrayList<>(requestBody.artifactFileNames.size()); + for (JobSubmitRequestBody.DistributedCacheFile artifactFileName : requestBody.artifactFileNames) { + Path artifactFile = getPathAndAssertUpload(artifactFileName.fileName, "Artifact", nameToFile); + artifacts.add(Tuple2.of(artifactFileName.entryName, new org.apache.flink.core.fs.Path(artifactFile.toString()))); } - return gateway.submitJob(jobGraph, timeout) - .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID())); + Map<String, DistributedCache.DistributedCacheEntry> temporaryHack = artifacts.stream() + .collect(Collectors.toMap( + tuple -> tuple.f0, + // the actual entry definition is mostly irrelevant as only the blobkey is accessed + // blame whoever wrote the ClientUtils API + tuple -> new DistributedCache.DistributedCacheEntry(tuple.f1.toString(), false) + )); + + // TODO: use executor + CompletableFuture<JobGraph> jobGraphFuture = CompletableFuture.supplyAsync(() -> { + JobGraph jobGraph; + try (ObjectInputStream objectIn = new ObjectInputStream(Files.newInputStream(jobGraphFile))) { + jobGraph = (JobGraph) objectIn.readObject(); + } catch (Exception e) { + throw new CompletionException(new RestHandlerException( + "Failed to deserialize JobGraph.", + HttpResponseStatus.BAD_REQUEST, + e)); + } + return jobGraph; + }); + + CompletableFuture<Integer> blobServerPortFuture = gateway.getBlobServerPort(timeout); + + CompletableFuture<JobGraph> finalizedJobGraphFuture = jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> { + final InetSocketAddress address = new InetSocketAddress(gateway.getHostname(), blobServerPort); + try (BlobClient blobClient = new BlobClient(address, new Configuration())) { + Collection<PermanentBlobKey> jarBlobKeys = ClientUtils.uploadUserJars(jobGraph.getJobID(), jarFiles, blobClient); + ClientUtils.setUserJarBlobKeys(jarBlobKeys, jobGraph); + + Collection<Tuple2<String, PermanentBlobKey>> artifactBlobKeys = ClientUtils.uploadUserArtifacts(jobGraph.getJobID(), temporaryHack, blobClient); + ClientUtils.setUserArtifactBlobKeys(jobGraph, artifactBlobKeys); + } catch (IOException e) { + throw new CompletionException(new RestHandlerException( + "Could not upload job files.", + HttpResponseStatus.INTERNAL_SERVER_ERROR, + e)); --- End diff -- I've filed https://issues.apache.org/jira/browse/FLINK-8713. > Extend JobSubmitHandler to accept jar files > ------------------------------------------- > > Key: FLINK-9280 > URL: https://issues.apache.org/jira/browse/FLINK-9280 > Project: Flink > Issue Type: New Feature > Components: Job-Submission, REST > Affects Versions: 1.5.0 > Reporter: Chesnay Schepler > Assignee: Chesnay Schepler > Priority: Critical > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > The job submission through the CLI first uploads all require jars to the blob > server, sets the blob keys in the jobgraph, and then uploads this graph to > The {{JobSubmitHandler}} which submits it to the Dispatcher. > This process has the downside that it requires jars to be uploaded to the > blobserver before submitting the job graph, which does not happen via REST. > I propose an extension to the the {{JobSubmitHandler}} to also accept an > optional list of jar files, that were previously uploaded through the > {{JarUploadHandler}}. If present, the handler would upload these jars to the > blobserver and set the blob keys. -- This message was sent by Atlassian JIRA (v7.6.3#76005)