[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523838#comment-16523838
 ] 

ASF GitHub Bot commented on FLINK-9280:
---------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6203#discussion_r198178787
  
    --- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
    @@ -315,42 +315,58 @@ public JobSubmissionResult submitJob(JobGraph 
jobGraph, ClassLoader classLoader)
                // we have to enable queued scheduling because slot will be 
allocated lazily
                jobGraph.setAllowQueuedScheduling(true);
     
    -           log.info("Requesting blob server port.");
    -           CompletableFuture<BlobServerPortResponseBody> portFuture = 
sendRequest(BlobServerPortHeaders.getInstance());
    +           CompletableFuture<JobSubmitResponseBody> submissionFuture = 
CompletableFuture.supplyAsync(
    +                   () -> {
    +                           log.info("Submitting job graph.");
    +
    +                           List<String> jarFileNames = new ArrayList<>(8);
    +                           List<JobSubmitRequestBody.DistributedCacheFile> 
artifactFileNames = new ArrayList<>(8);
    +                           Collection<FileUpload> filesToUpload = new 
ArrayList<>(8);
     
    -           CompletableFuture<JobGraph> jobUploadFuture = 
portFuture.thenCombine(
    -                   getDispatcherAddress(),
    -                   (BlobServerPortResponseBody response, String 
dispatcherAddress) -> {
    -                           final int blobServerPort = response.port;
    -                           final InetSocketAddress address = new 
InetSocketAddress(dispatcherAddress, blobServerPort);
    -                           final List<PermanentBlobKey> keys;
    +                           // TODO: need configurable location
    +                           final java.nio.file.Path jobGraphFile;
                                try {
    -                                   log.info("Uploading jar files.");
    -                                   keys = BlobClient.uploadFiles(address, 
flinkConfig, jobGraph.getJobID(), jobGraph.getUserJars());
    -                                   jobGraph.uploadUserArtifacts(address, 
flinkConfig);
    -                           } catch (IOException ioe) {
    -                                   throw new CompletionException(new 
FlinkException("Could not upload job files.", ioe));
    +                                   jobGraphFile = 
Files.createTempFile("flink-jobgraph", ".bin");
    +                                   try (OutputStream fileOut = 
Files.newOutputStream(jobGraphFile)) {
    +                                           try (ObjectOutputStream 
objectOut = new ObjectOutputStream(fileOut)) {
    +                                                   
objectOut.writeObject(jobGraph);
    +                                           }
    +                                   }
    +                                   filesToUpload.add(new 
FileUpload(jobGraphFile, RestConstants.CONTENT_TYPE_BINARY));
    +                           } catch (IOException e) {
    +                                   throw new RuntimeException("lol", e);
    --- End diff --
    
    needs a proper exception


> Extend JobSubmitHandler to accept jar files
> -------------------------------------------
>
>                 Key: FLINK-9280
>                 URL: https://issues.apache.org/jira/browse/FLINK-9280
>             Project: Flink
>          Issue Type: New Feature
>          Components: Job-Submission, REST
>    Affects Versions: 1.5.0
>            Reporter: Chesnay Schepler
>            Assignee: Chesnay Schepler
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to