Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6147#discussion_r195067790 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -313,38 +305,32 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) // we have to enable queued scheduling because slot will be allocated lazily jobGraph.setAllowQueuedScheduling(true); - log.info("Requesting blob server port."); - CompletableFuture<BlobServerPortResponseBody> portFuture = sendRequest(BlobServerPortHeaders.getInstance()); - - CompletableFuture<JobGraph> jobUploadFuture = portFuture.thenCombine( - getDispatcherAddress(), - (BlobServerPortResponseBody response, String dispatcherAddress) -> { - final int blobServerPort = response.port; - final InetSocketAddress address = new InetSocketAddress(dispatcherAddress, blobServerPort); - final List<PermanentBlobKey> keys; - try { - log.info("Uploading jar files."); - keys = BlobClient.uploadFiles(address, flinkConfig, jobGraph.getJobID(), jobGraph.getUserJars()); - jobGraph.uploadUserArtifacts(address, flinkConfig); - } catch (IOException ioe) { - throw new CompletionException(new FlinkException("Could not upload job files.", ioe)); - } - - for (PermanentBlobKey key : keys) { - jobGraph.addUserJarBlobKey(key); - } - - return jobGraph; - }); - - CompletableFuture<JobSubmitResponseBody> submissionFuture = jobUploadFuture.thenCompose( - (JobGraph jobGraphToSubmit) -> { - log.info("Submitting job graph."); - + CompletableFuture<JobSubmitResponseBody> submissionFuture = getWebMonitorBaseUrl() + .thenCompose(webMonitorBaseUrl -> { try { - return sendRequest( + jobGraph.zipUserArtifacts(); + + Collection<Path> localUserArtifacts = jobGraph.getUserArtifacts().values().stream() + .map(entry -> new Path(entry.filePath)) + .filter(path -> { + try { + return !path.getFileSystem().isDistributedFS(); + } catch (Exception e) { + log.warn("Could not determine whether {} is a local file. The file may not be accessible via the Distributed Cache.", path, e); + // filesystem isn't accessible from the client or FS class not present + return false; + } + }) + .collect(Collectors.toList()); + + return restClient.sendRequest( --- End diff -- Let's add retries for this call by adding a `sendRetriableRequest(...)` with correct signature. Even better would be to add a `sendRequest(...)` with the correct signature which dispatches to `sendRetriableRequest`.
---