Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6147#discussion_r195067790
  
    --- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
    @@ -313,38 +305,32 @@ public JobSubmissionResult submitJob(JobGraph 
jobGraph, ClassLoader classLoader)
                // we have to enable queued scheduling because slot will be 
allocated lazily
                jobGraph.setAllowQueuedScheduling(true);
     
    -           log.info("Requesting blob server port.");
    -           CompletableFuture<BlobServerPortResponseBody> portFuture = 
sendRequest(BlobServerPortHeaders.getInstance());
    -
    -           CompletableFuture<JobGraph> jobUploadFuture = 
portFuture.thenCombine(
    -                   getDispatcherAddress(),
    -                   (BlobServerPortResponseBody response, String 
dispatcherAddress) -> {
    -                           final int blobServerPort = response.port;
    -                           final InetSocketAddress address = new 
InetSocketAddress(dispatcherAddress, blobServerPort);
    -                           final List<PermanentBlobKey> keys;
    -                           try {
    -                                   log.info("Uploading jar files.");
    -                                   keys = BlobClient.uploadFiles(address, 
flinkConfig, jobGraph.getJobID(), jobGraph.getUserJars());
    -                                   jobGraph.uploadUserArtifacts(address, 
flinkConfig);
    -                           } catch (IOException ioe) {
    -                                   throw new CompletionException(new 
FlinkException("Could not upload job files.", ioe));
    -                           }
    -
    -                           for (PermanentBlobKey key : keys) {
    -                                   jobGraph.addUserJarBlobKey(key);
    -                           }
    -
    -                           return jobGraph;
    -                   });
    -
    -           CompletableFuture<JobSubmitResponseBody> submissionFuture = 
jobUploadFuture.thenCompose(
    -                   (JobGraph jobGraphToSubmit) -> {
    -                           log.info("Submitting job graph.");
    -
    +           CompletableFuture<JobSubmitResponseBody> submissionFuture = 
getWebMonitorBaseUrl()
    +                   .thenCompose(webMonitorBaseUrl -> {
                                try {
    -                                   return sendRequest(
    +                                   jobGraph.zipUserArtifacts();
    +
    +                                   Collection<Path> localUserArtifacts = 
jobGraph.getUserArtifacts().values().stream()
    +                                           .map(entry -> new 
Path(entry.filePath))
    +                                           .filter(path -> {
    +                                                   try {
    +                                                           return 
!path.getFileSystem().isDistributedFS();
    +                                                   } catch (Exception e) {
    +                                                           log.warn("Could 
not determine whether {} is a local file. The file may not be accessible via 
the Distributed Cache.", path, e);
    +                                                           // filesystem 
isn't accessible from the client or FS class not present
    +                                                           return false;
    +                                                   }
    +                                           })
    +                                           .collect(Collectors.toList());
    +
    +                                   return restClient.sendRequest(
    --- End diff --
    
    Let's add retries for this call by adding a `sendRetriableRequest(...)` 
with correct signature. Even better would be to add a `sendRequest(...)` with 
the correct signature which dispatches to `sendRetriableRequest`.


---

Reply via email to