[ https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16527828#comment-16527828 ]
ASF GitHub Bot commented on FLINK-9280: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6203#discussion_r199198055 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java --- @@ -71,20 +112,131 @@ public void testSerializationFailureHandling() throws Exception { @Test public void testSuccessfulJobSubmission() throws Exception { + final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath(); + try (OutputStream fileOut = Files.newOutputStream(jobGraphFile)) { + try (ObjectOutputStream objectOut = new ObjectOutputStream(fileOut)) { + objectOut.writeObject(new JobGraph("testjob")); + } + } + DispatcherGateway mockGateway = mock(DispatcherGateway.class); + when(mockGateway.getHostname()).thenReturn("localhost"); + when(mockGateway.getBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(blobServer.getPort())); when(mockGateway.submitJob(any(JobGraph.class), any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); GatewayRetriever<DispatcherGateway> mockGatewayRetriever = mock(GatewayRetriever.class); JobSubmitHandler handler = new JobSubmitHandler( CompletableFuture.completedFuture("http://localhost:1234"), mockGatewayRetriever, RpcUtils.INF_TIMEOUT, - Collections.emptyMap()); + Collections.emptyMap(), + TestingUtils.defaultExecutor()); - JobGraph job = new JobGraph("testjob"); - JobSubmitRequestBody request = new JobSubmitRequestBody(job); + JobSubmitRequestBody request = new JobSubmitRequestBody(jobGraphFile.getFileName().toString(), Collections.emptyList(), Collections.emptyList()); - handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance()), mockGateway) + handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance(), Collections.emptyMap(), Collections.emptyMap(), Collections.singleton(jobGraphFile.toFile())), mockGateway) .get(); } + + @Test + public void testRejectionOnCountMismatch() throws Exception { + final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath(); + try (OutputStream fileOut = Files.newOutputStream(jobGraphFile)) { + try (ObjectOutputStream objectOut = new ObjectOutputStream(fileOut)) { + objectOut.writeObject(new JobGraph("testjob")); + } + } + final Path countExceedingFile = TEMPORARY_FOLDER.newFile().toPath(); + + DispatcherGateway mockGateway = mock(DispatcherGateway.class); + when(mockGateway.getHostname()).thenReturn("localhost"); + when(mockGateway.getBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(blobServer.getPort())); + when(mockGateway.submitJob(any(JobGraph.class), any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); --- End diff -- This is duplicate code, can we avoid it? > Extend JobSubmitHandler to accept jar files > ------------------------------------------- > > Key: FLINK-9280 > URL: https://issues.apache.org/jira/browse/FLINK-9280 > Project: Flink > Issue Type: New Feature > Components: Job-Submission, REST > Affects Versions: 1.5.0 > Reporter: Chesnay Schepler > Assignee: Chesnay Schepler > Priority: Critical > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > The job submission through the CLI first uploads all require jars to the blob > server, sets the blob keys in the jobgraph, and then uploads this graph to > The {{JobSubmitHandler}} which submits it to the Dispatcher. > This process has the downside that it requires jars to be uploaded to the > blobserver before submitting the job graph, which does not happen via REST. > I propose an extension to the the {{JobSubmitHandler}} to also accept an > optional list of jar files, that were previously uploaded through the > {{JarUploadHandler}}. If present, the handler would upload these jars to the > blobserver and set the blob keys. -- This message was sent by Atlassian JIRA (v7.6.3#76005)