[ https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16527815#comment-16527815 ]
ASF GitHub Bot commented on FLINK-9280: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199187606 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java --- @@ -34,38 +40,137 @@ import javax.annotation.Nonnull; -import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; import java.io.ObjectInputStream; +import java.net.InetSocketAddress; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; /** * This handler can be used to submit jobs to a Flink cluster. */ public final class JobSubmitHandler extends AbstractRestHandler<DispatcherGateway, JobSubmitRequestBody, JobSubmitResponseBody, EmptyMessageParameters> { + private static final String FILE_TYPE_GRAPH = "JobGraph"; + private static final String FILE_TYPE_JAR = "Jar"; + private static final String FILE_TYPE_ARTIFACT = "Artifact"; + + private final Executor executor; + public JobSubmitHandler( CompletableFuture<String> localRestAddress, GatewayRetriever<? extends DispatcherGateway> leaderRetriever, Time timeout, - Map<String, String> headers) { + Map<String, String> headers, + Executor executor) { super(localRestAddress, leaderRetriever, timeout, headers, JobSubmitHeaders.getInstance()); + this.executor = executor; } @Override protected CompletableFuture<JobSubmitResponseBody> handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException { - JobGraph jobGraph; - try { - ObjectInputStream objectIn = new ObjectInputStream(new ByteArrayInputStream(request.getRequestBody().serializedJobGraph)); - jobGraph = (JobGraph) objectIn.readObject(); - } catch (Exception e) { + Collection<File> uploadedFiles = request.getUploadedFiles(); + Map<String, Path> nameToFile = uploadedFiles.stream().collect(Collectors.toMap( + path -> path.toPath().getFileName().toString(), + File::toPath + )); + + if (uploadedFiles.size() != nameToFile.size()) { throw new RestHandlerException( - "Failed to deserialize JobGraph.", - HttpResponseStatus.BAD_REQUEST, - e); + String.format("The number of uploaded files was %s than the expected count. Expected: %s Actual %s", + uploadedFiles.size() < nameToFile.size() ? "lower" : "higher", + nameToFile.size(), + uploadedFiles.size()), + HttpResponseStatus.BAD_REQUEST + ); + } + + JobSubmitRequestBody requestBody = request.getRequestBody(); + + Path jobGraphFile = getPathAndAssertUpload(requestBody.jobGraphFileName, FILE_TYPE_GRAPH, nameToFile); --- End diff -- maybe make `final`. > Extend JobSubmitHandler to accept jar files > ------------------------------------------- > > Key: FLINK-9280 > URL: https://issues.apache.org/jira/browse/FLINK-9280 > Project: Flink > Issue Type: New Feature > Components: Job-Submission, REST > Affects Versions: 1.5.0 > Reporter: Chesnay Schepler > Assignee: Chesnay Schepler > Priority: Critical > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > The job submission through the CLI first uploads all require jars to the blob > server, sets the blob keys in the jobgraph, and then uploads this graph to > The {{JobSubmitHandler}} which submits it to the Dispatcher. > This process has the downside that it requires jars to be uploaded to the > blobserver before submitting the job graph, which does not happen via REST. > I propose an extension to the the {{JobSubmitHandler}} to also accept an > optional list of jar files, that were previously uploaded through the > {{JarUploadHandler}}. If present, the handler would upload these jars to the > blobserver and set the blob keys. -- This message was sent by Atlassian JIRA (v7.6.3#76005)