Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6147#discussion_r195089124 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java --- @@ -65,7 +86,96 @@ public JobSubmitHandler( e); } - return gateway.submitJob(jobGraph, timeout) + updateJarEntriesInJobGraph(jobGraph, requestBody.getUploadedJars(), log); + updateUserArtifactEntriesInJobGraph(jobGraph, requestBody.getUploadedArtifacts(), log); + + CompletableFuture<Integer> blobServerPortFuture = gateway.getBlobServerPort(timeout); + + CompletableFuture<JobGraph> jobGraphFuture = blobServerPortFuture.thenApply(blobServerPort -> { + final InetSocketAddress address = new InetSocketAddress(getDispatcherHost(gateway), blobServerPort); + final List<PermanentBlobKey> keys; + try { + keys = BlobClient.uploadFiles(address, config, jobGraph.getJobID(), jobGraph.getUserJars()); + jobGraph.uploadUserArtifacts(address, config); --- End diff -- Instead of calling `updateUserArtifactEntriesInJobGraph` and then `jobGraph.uploadUserArtifacts` we could simply take `requestBody.getUploadedArtifacts` upload them to the `BlobServer` and add the blob keys to the `JobGraph` such that it knows where to retrieve the user artifacts from.
---