Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6147#discussion_r195087298 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java --- @@ -65,7 +86,96 @@ public JobSubmitHandler( e); } - return gateway.submitJob(jobGraph, timeout) + updateJarEntriesInJobGraph(jobGraph, requestBody.getUploadedJars(), log); + updateUserArtifactEntriesInJobGraph(jobGraph, requestBody.getUploadedArtifacts(), log); + + CompletableFuture<Integer> blobServerPortFuture = gateway.getBlobServerPort(timeout); + + CompletableFuture<JobGraph> jobGraphFuture = blobServerPortFuture.thenApply(blobServerPort -> { + final InetSocketAddress address = new InetSocketAddress(getDispatcherHost(gateway), blobServerPort); + final List<PermanentBlobKey> keys; + try { + keys = BlobClient.uploadFiles(address, config, jobGraph.getJobID(), jobGraph.getUserJars()); + jobGraph.uploadUserArtifacts(address, config); + } catch (IOException ioe) { + log.error("Could not upload job jar files.", ioe); + throw new CompletionException(new RestHandlerException("Could not upload job jar files.", HttpResponseStatus.INTERNAL_SERVER_ERROR)); + } + + for (PermanentBlobKey key : keys) { + jobGraph.addUserJarBlobKey(key); + } + + return jobGraph; + }); + + CompletableFuture<JobSubmitResponseBody> submissionFuture = jobGraphFuture + .thenCompose(finalizedJobGraph -> gateway.submitJob(jobGraph, timeout)) .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID())); + + CompletableFuture<Void> submissionCleanupFuture = submissionFuture.thenRun(requestBody::cleanup); + + return submissionFuture.thenCombine(submissionCleanupFuture, (responseBody, ignored) -> responseBody); + } + + /** + * Updates the jar entries in the given JobGraph to refer to the uploaded jar files instead of client-local files. + */ + private static void updateJarEntriesInJobGraph(JobGraph jobGraph, Collection<Path> uploadedJars, Logger log) { + // the existing entries still reference client-local jars + jobGraph.getUserJars().clear(); + for (Path jar : uploadedJars) { + log.debug("Adding jar {} to JobGraph({}).", jar, jobGraph.getJobID()); + jobGraph.addJar(new org.apache.flink.core.fs.Path(jar.toUri())); + } + } + + /** + * Updates the user-artifact entries in the given JobGraph to refer to the uploaded artifacts instead of client-local artifacts. + */ + private static void updateUserArtifactEntriesInJobGraph(JobGraph jobGraph, Collection<Path> uploadedArtifacts, Logger log) { + // match the names of uploaded files to the names stored in the distributed cache entries to find entries we have to override + + // create a new map from file name -> distributed cache map entry + Map<String, Tuple2<String, DistributedCache.DistributedCacheEntry>> remappedArtifactEntries = jobGraph.getUserArtifacts().entrySet().stream() + .collect(Collectors.toMap( + entry -> new org.apache.flink.core.fs.Path(entry.getValue().filePath).getName(), + entry -> Tuple2.of(entry.getKey(), entry.getValue()) + )); + // create a new map from file name -> local file + Map<String, Path> mappedUploadedArtifacts = uploadedArtifacts.stream() + .collect(Collectors.toMap( + artifact -> new org.apache.flink.core.fs.Path(artifact.toUri()).getName(), + artifact -> artifact + )); + + if (!remappedArtifactEntries.isEmpty() && !mappedUploadedArtifacts.isEmpty()) { + jobGraph.getUserArtifacts().clear(); + for (Map.Entry<String, Tuple2<String, DistributedCache.DistributedCacheEntry>> entry : remappedArtifactEntries.entrySet()) { + String fileName = entry.getKey(); + String dcEntryName = entry.getValue().f0; + DistributedCache.DistributedCacheEntry dcEntry = entry.getValue().f1; + + Path uploadedArtifact = mappedUploadedArtifacts.get(fileName); + if (uploadedArtifact != null) { + log.debug("Overwriting path {} for distributed-cache entry {} with {}.", dcEntry.filePath, dcEntryName, uploadedArtifact.toFile()); + jobGraph.addUserArtifact(dcEntryName, new DistributedCache.DistributedCacheEntry( + uploadedArtifact.toString(), dcEntry.isExecutable, dcEntry.isZipped)); + } else { + jobGraph.addUserArtifact(dcEntryName, dcEntry); + } + } + } + } + + private static String getDispatcherHost(DispatcherGateway gateway) { --- End diff -- Let's call it `getDispatcherHostname`
---